Recurrent Neural Network RNN Ⅲ

さくら VPS RNN 豪ドル/円4時間足大容量データ予測

Win11 WSL2 Ubuntu Python RNN Ⅱ--RNN 豪ドル/円4時間足大容量データ予測ファイルまとめ
で作成した 5000本の大容量データを使用した
RNN 豪ドル/円4時間足大容量データ予測ファイル keras-go-long-4h.py
をさくら VPS のなかで動かしたいと思います。
さくら VPS のなかで keras-go-long-4h.py が動作するように改造します。

  1. 予測チャ-ト保存

    でき上がった予測チャ-ト図を保存します。
    保存場所はフルパスで指示します。
    plt.savefig(
    '/home/yamada/public_html/manep-img/mane_chart_go_keras_long_4h.png')
    manep-img フォルダはあらかじめ作成しておいてください。

  2. 各デ-タセット表示削除

    訓練、検証デ-タセット表示は特に必要ないので削除します。

  3. その他 print( )文等をコメント化

    デバッグのため print, plt.plot, plt.show 文を挿入していたのをコメント化します。
    要するにサイレント化して実行するようにします。

  4. verbose = 0 有効にする

    verbose = 0 とすると進捗状況報告なしとなります。

    history = model.fit(
      train_dataset,
      epochs=40,
      verbose=0,
      validation_data=val_dataset,
      callbacks=callbacks_list)
     ・・・・・・・・・・・・・・・・・・・・・・・・・・・・・・
    for i in range(6):
      test_data_f = np.reshape(future_test, (1, 30, 1))
      batch_predict = model.predict(test_data_f, verbose=0)
      future_test = np.delete(future_test, 0)
      future_test = np.append(future_test, batch_predict)
      future_result = np.append(future_result, batch_predict) 
    

さくら VPS RNN 豪ドル/円4時間足大容量データ予測ファイル

さくら VPS で動作する RNN 豪ドル/円4時間足大容量データ予測ファイルまとめます。
keras-go-long-4h.py との差分は赤字にし、ファイル名は
keras-go-lnsk-4h.py とします。


import os
# 1:INFOメッセージが出ないようにするs
os.environ['TF_CPP_MIN_LOG_LEVEL']='1'
import datetime
from datetime import datetime as dt
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorflow import keras
from mplfinance.original_flavor import candlestick_ohlc

def addBusinessDays_4h(current_date, business_days_to_add_h):
# current_date : 処理する初日の日時 例) 2022-02-03 00:00:00
# business_days_to_add_h : 4時間ずつ延長する回数 
  cur_date8 = [] # 21/03/30 12~
  cur_date4 = [] # 3/30
  man_f = 1 #月曜フラグ 0 or 1
  doyo_f = 0  #土曜フラグ 0 or 1
  current_date0 = current_date
  weekday0 = current_date0.weekday()
  if(weekday0 == 0): #月
    man_f = 0
    
  while business_days_to_add_h > 0:
    current_date += datetime.timedelta(hours=4)
    weekday = current_date.weekday()
    hour_doyo = current_date.hour
    if weekday == 6: # sunday = 6
      continue
    if((weekday == 5) and (hour_doyo == 8)): #土
      doyo_f = 1
    if((weekday == 5) and (doyo_f == 1)): #土
      continue
        
    if((weekday == 0) and (man_f == 1)): #月
      man_f = 0 
      continue
  # 年/月/日 時刻として文字列にする処理
  # 例 22/03/30 12~     
    e11 = current_date.strftime("%y/%m/%d %H")
    e22 = e11.replace("/0", "/")
    e22 = e22 + "~"
    e33 = e22[3:] # 22/ 削除
    cur_date8.append(e22)
    cur_date4.append(e33)
    business_days_to_add_h -= 1
  return cur_date8, cur_date4

xl_df = pd.read_csv(
  "/home/yamada/public_html/manep-w/mane_chart_go_long_4h.csv",
  encoding="cp932")

Open = xl_df["始値(売り)"].values
High = xl_df["高値(売り)"].values
Low = xl_df["安値(売り)"].values
Close = xl_df["終値(売り)"].values
Date = xl_df["日付"].values
Idx = xl_df.index
tstr = Date[-1]
raw_data0 = Close.copy()
raw_data = Close.copy()
Close1 = Close.copy()
tdatetime = dt.strptime(tstr, '%Y/%m/%d %H:%M:%S')
# 文字列をdatetimeに変換するのがstrptime()関数
# datetime.datetime.strptime(文字列, 書式指定文字列)
# lastday = addBusinessDays(tdatetime, 5)
lastday = addBusinessDays_4h(tdatetime, 6)

Date_100 = Date[-100:]
xDate = []
xD = []
dayf = 0
for i, key in enumerate(Date_100):
  day00 = str(key)[11:19]
  if((dayf == 1) and (day00 == '00:00:00')):
    dayf = 0
  else:
    if((dayf == 0) and (day00 == '00:00:00')):
      e4 = str(key)[4:10]
      e6 = e4.replace("/0", "/")
      e8 = e6.lstrip("/")
      xDate.append(e8)
      xD.append(i)
      dayf = 1

if(dayf == 0):
  e4l = tstr[7:14]
  e6l = e4l.replace("/0", "/")
  e8l = e6l.lstrip("/")
  xDate.append(e8l)
  xD.append(i)

# 行列の平均、標準偏差を求めます。
mean = np.mean(raw_data)
# print("Mean", mean)
raw_data -= mean
std = np.std(raw_data)
print("Std ",std)
# 標準偏差値に変換
raw_data /= std
# print("各標準偏差値")
# print(raw_data)

raw_data_100 = raw_data0[-100:]
# print("raw_data_100")
# print(raw_data_100)
# ラスト100個の行列の平均、標準偏差を求めます。
mean_100 = np.mean(raw_data_100)
# print("Mean_100", mean_100)
raw_data_100 -= mean_100
std_100 = np.std(raw_data_100)
# print("Std_100 ",std_100)
# 標準偏差値に変換
raw_data_100 /= std_100
# print("テストデータ各標準偏差値 std_100")
# print(raw_data_100)
# 連続デ-タとする。一つおきは、2
sampling_rate = 1
# 過去20間隔デ-タをひとまとまりとして時系列予測する
sequence_length = 30
delay = sampling_rate * sequence_length
# print("delay:", delay)
batch_size = 32  # 適当
# 検証デ-タのスタ-ト値
# num_half_samples = int(0.5 * len(raw_data))
train_dataset = keras.utils.timeseries_dataset_from_array(
  raw_data,
  targets=raw_data[delay:],
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
)
val_dataset = keras.utils.timeseries_dataset_from_array(
  # raw_data[:-1],
  raw_data_100,
  targets=raw_data_100[delay:],
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
  # start_index=num_half_samples,
)
test_dataset = keras.utils.timeseries_dataset_from_array(
  raw_data_100,
  targets=None,
  sampling_rate=sampling_rate,
  sequence_length=sequence_length,
  batch_size=batch_size,
)
# numpy ndarray 配列に変換して表示
# 訓練デ-タセット表示
"""削除開始
itr = 0
for samples, targets in train_dataset:
  samples_n = samples.numpy()
  targets_n = targets.numpy()
  if itr == 0:
    print("Start in-train:", samples_n[0])
    print("Start tar-train:", targets_n[0])
    itr = itr + 1
print("End in-train:", samples_n[-1])
print("End tar-train:", targets_n[-1])
# 検証デ-タセット表示
itv = 0
for samples_v, targets_v in val_dataset:
  samples_vn = samples_v.numpy()
  targets_vn = targets_v.numpy()
  if itv == 0:
    print("Start in-val:", samples_vn[0])
    print("Start tar-val:", targets_vn[0])
    itv = itv + 1
print("End in-val:", samples_vn[-1])
print("End tar-val:", targets_vn[-1])
ここまで削除
"""
# テストデ-タセット表示
i = 0
for inputs_t in test_dataset:
  inputs_n = inputs_t.numpy()
  if i == 0:
    # print("Start test:", inputs_n[0])
    i = i + 1
# print("End test:", inputs_n[-1])

from keras import layers
from keras import initializers

inputs = keras.Input(shape=(sequence_length,))
x = layers.Flatten()(inputs)
x = layers.Dense(
    30,
    activation="tanh",
    kernel_initializer='zeros'
    )(x)
outputs = layers.Dense(1)(x)
model = keras.Model(inputs, outputs)
# print("モデルア-キテクチャ")
# print(model.summary())

callbacks_list = [
  keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=2,
    ),
  keras.callbacks.ModelCheckpoint(
    "/home/yamada/public_html/colab/jena_dense_l_4h.keras",
    save_best_only=True,
    )
]
model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])
history = model.fit(
  train_dataset,
  epochs=40,
  verbose=0,
  validation_data=val_dataset,
  callbacks=callbacks_list)
  
loss = history.history["mae"] # 平均絶対誤差(MAE)
val_loss = history.history["val_mae"]
epochs = range(1, len(loss) + 1)

# plt.plot(epochs, loss, "bo", label="Training MAE")
# plt.plot(epochs, val_loss, "b", label="Validation MAE")
# plt.title("Training and Validation MAE")
# plt.legend()
# plt.show()

model = keras.models.load_model(
  "/home/yamada/public_html/colab/jena_dense_l_4h.keras")
pre = model.predict(test_dataset, verbose=0)
# verbose=0:進捗状況報告なし
pre1 = np.reshape(pre, (-1))
# print(f"Test 予測値:")
# print(pre1)

future_test = inputs_t[-1:]
print("future_test 最初の配列値")
print(future_test)
future_result = []
for i in range(6):
  test_data_f = np.reshape(future_test, (1, 30, 1))
  batch_predict = model.predict(test_data_f, verbose=0)
  future_test = np.delete(future_test, 0)
  future_test = np.append(future_test, batch_predict)
  future_result = np.append(future_result, batch_predict)
# print("future_result :")
# print(future_result)
# len_raw_data = len(raw_data)
len_raw_data = 100
xx1 = np.arange(sequence_length, len_raw_data + 1)
# print(xx1)
xx3 = np.arange(len_raw_data, len_raw_data + 6)
# print(xx3)
# plt.plot(xx1, pre)
# plt.plot(xx3, future_result)
# plt.show()

pre_chg = pre.copy()
pre_chg *= std_100
pre_chg += mean_100
pre_chg1 = np.reshape(pre_chg, (-1))
# print("pre_chg1:")
# print(pre_chg1)

f_result = future_result.copy()
f_result *= std_100
f_result += mean_100
# print("f_result:", f_result)
Idx_100 = Idx[-100:] - Idx[-100]
# print("Idx_100")
# print(Idx_100)
Close_100 = Close[-100:]
# plt.plot(Idx_100, Close_100)
# plt.plot(xx1, pre_chg)
# plt.plot(xx3, f_result)
# plt.show()
Open_100 = Open[-100:]
High_100 = High[-100:]
Low_100 = Low[-100:]
ohlc = zip(
Idx_100, Open_100, High_100, Low_100, Close_100)

fig = plt.figure(
 figsize=(8.34, 5.56))
# python スクリプトと Jupyter とでは、
# matplotlib の図のサイズが違うので注意
ax = fig.add_subplot(1,1,1)
ax.grid()
# 解析結果表示はここに挿入
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result, 'bo')
candlestick_ohlc(
 ax, ohlc, width=0.5, alpha = 1,
 colorup='r', colordown='g')
plt.xticks(xD, xDate)
plt.title('AUS$ / JPY chart')
plt.xlabel('Date')
plt.ylabel('Yen')
plt.savefig(
 '/home/yamada/public_html/manep-img/mane_chart_go_keras_long_4h.png')
# plt.show()
# print("予測値=最終日時 +1~+6")
valhe = np.round(f_result, 3)#3桁まで表示
valhe_pd = pd.DataFrame(valhe)
# pandas concat 関数で横(列)方向へ連結する、axis=1 を忘れないこと
lastday_pd = pd.DataFrame(lastday[0])
df_concat = pd.concat([lastday_pd, valhe_pd], axis = 1)
# print(df_concat)
df_concat.to_csv(
 '/home/yamada/public_html/manep-img/mane_chart_go_keras_long_4h.csv',
 header=False, index=False)

以上で変更作業は完了です。
引き続き RNN 豪ドル/円4時間足大容量データ予測ファイル keras-go-lnsk-4h.py を確認します。


  • さくら VPS RNN 豪ドル/円4時間足大容量データ予測ファイル作成 に進む
  • RNN 豪ドル/円4時間足大容量データ予測ファイル動作確認Ⅱ に戻る
  • 70VPS に戻る