Recurrent Neural Network RNN Ⅲ
さくら VPS RNN 豪ドル/円予測Ⅲ
Win11 WSL2 Ubuntu Python RNN--WSL2 Python RNN 予測ファイルまとめ
で作成した 5000本の大容量データを使用した
WSL2 Python RNN 予測ファイル keras-go-long-4.py
をさくら VPS のなかで動かしたいと思います。
さくら VPS のなかで keras-go-long-4.py が動作するように改造します。
- 予測チャ-ト保存
でき上がった予測チャ-ト図を保存します。
保存場所はフルパスで指示します。
plt.savefig(
'/home/yamada/public_html/manep-img/mane_chart_go_keras_long.png')
manep-img フォルダはあらかじめ作成しておいてください。 - その他 print( )文等をコメント化
デバッグのため print, plt.plot, plt.show 文を挿入していたのをコメント化します。
要するにサイレント化して実行するようにします。
さくら VPS RNN 豪ドル/円予測Ⅲファイル
さくら VPS で動作する RNN 豪ドル/円予測Ⅲファイルまとめます。
keras-go-long-4.py との差分は赤字にし、ファイル名は
keras-go-lnsk-4.py とします。
# -*- coding: utf-8 -*-
#!/usr/bin/python3
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='1'
import datetime
from datetime import datetime as dt
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorflow import keras
from mplfinance.original_flavor import candlestick_ohlc
def addBusinessDays(from_date, add_days):
cur_date8 = [] # 21/03/30
cur_date4 = [] # 3/30
business_days_to_add = add_days
current_date = from_date
while business_days_to_add > 0:
current_date += datetime.timedelta(days=1)
weekday = current_date.weekday()
if weekday >= 5: # sunday = 6
continue
#土日は以下はスキップ
# 年/月/日として文字列にする
#例 21/03/30
e11 = current_date.strftime("%y/%m/%d")
e22 = e11.replace("/0", "/")
e33 = e22[3:] # 21/ 削除
cur_date8.append(e22)
cur_date4.append(e33)
business_days_to_add -= 1
return cur_date8, cur_date4
xl_df = pd.read_csv(
"/home/yamada/public_html/manep-w/mane_chart_go_long.csv", encoding="cp932")
Open = xl_df["始値(売り)"].values
High = xl_df["高値(売り)"].values
Low = xl_df["安値(売り)"].values
Close = xl_df["終値(売り)"].values
Date = xl_df["日付"].values
Idx = xl_df.index
tstr = Date[-1]
raw_data0 = Close.copy()
raw_data = Close.copy()
Close1 = Close.copy()
tdatetime = dt.strptime(tstr, '%Y/%m/%d %H:%M:%S')
# 文字列をdatetimeに変換するのがstrptime()関数
# datetime.datetime.strptime(文字列, 書式指定文字列)
lastday = addBusinessDays(tdatetime, 5)
Date_100 = Date[-100:]
xDate = []
xD = []
for i, key in enumerate(Date_100):
if(i % 10 == 0):
e4 = str(key)[4:10]
e6 = e4.replace("/0", "/")
e8 = e6.lstrip("/")
xDate.append(e8)
xD.append(i)
xDate.append(lastday[1][4])
xD.append(i + 5)
#5日間の予測日の追加
# 行列の平均、標準偏差を求めます。
mean = np.mean(raw_data)
# print("Mean", mean)
raw_data -= mean
std = np.std(raw_data)
# print("Std ",std)
# 標準偏差値に変換
raw_data /= std
# print("各標準偏差値")
# print(raw_data)
# for d_a_s in raw_data:
# print(d_a_s)
raw_data_100 = raw_data0[-100:]
# print("raw_data_100")
# print(raw_data_100)
# ラスト100個の行列の平均、標準偏差を求めます。
mean_100 = np.mean(raw_data_100)
# print("Mean_100", mean_100)
raw_data_100 -= mean_100
std_100 = np.std(raw_data_100)
# print("Std_100 ",std_100)
# 標準偏差値に変換
raw_data_100 /= std_100
# print("テストデータ各標準偏差値 std_100")
# print(raw_data_100)
# 連続デ-タとする。一つおきは、2
sampling_rate = 1
# 過去20間隔デ-タをひとまとまりとして時系列予測する
sequence_length = 20
delay = sampling_rate * sequence_length
# print("delay:", delay)
batch_size = 32 # 適当
# 検証デ-タのスタ-ト値
# num_half_samples = int(0.5 * len(raw_data))
# num_half_samples = Idx[-100]
train_dataset = keras.utils.timeseries_dataset_from_array(
# raw_data[:-delay],
raw_data,
targets=raw_data[delay:],
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
)
val_dataset = keras.utils.timeseries_dataset_from_array(
# start_index を使いかつ targets 指定ある時、最後までは[:-1]が必要
# raw_data[:-1],
raw_data_100,
targets=raw_data_100[delay:],
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
# start_index = num_half_samples,
)
test_dataset = keras.utils.timeseries_dataset_from_array(
raw_data_100,
targets=None,
sampling_rate=sampling_rate,
sequence_length=sequence_length,
batch_size=batch_size,
# start_index = num_half_samples,
)
'''以下削除
# numpy ndarray 配列に変換して表示
# 訓練デ-タセット表示
itr = 0
for samples, targets in train_dataset:
samples_n = samples.numpy()
targets_n = targets.numpy()
if itr == 0:
print("Start in-train:", samples_n[0])
print("Start tar-train:", targets_n[0])
itr = itr + 1
print("End in-train:", samples_n[-1])
print("End tar-train:", targets_n[-1])
# 検証デ-タセット表示
itv = 0
for samples_v, targets_v in val_dataset:
samples_vn = samples_v.numpy()
targets_vn = targets_v.numpy()
if itv == 0:
print("Start in-val:", samples_vn[0])
print("Start tar-val:", targets_vn[0])
itv = itv + 1
print("End in-val:", samples_vn[-1])
print("End tar-val:", targets_vn[-1])
# ここまで削除
'''
# テストデ-タセット表示
i = 0
for inputs_t in test_dataset:
inputs_n = inputs_t.numpy()
if i == 0:
# print("Start test:", inputs_n[0])
i = i + 1
# print("End test:", inputs_n[-1])
# from tensorflow import keras
from keras import layers
from keras import initializers
sequence_length = 20
inputs = keras.Input(shape=(sequence_length,))
x = layers.Flatten()(inputs)
x = layers.Dense(
20,
activation="tanh",
kernel_initializer='zeros'
)(x)
outputs = layers.Dense(1)(x)
model = keras.Model(inputs, outputs)
# print("モデルア-キテクチャ")
# print(model.summary())
callbacks_list = [
keras.callbacks.EarlyStopping(
monitor="val_loss",
patience=2,
),
keras.callbacks.ModelCheckpoint(
# "drive/MyDrive/Colab Notebooks/my_data/jena_dense.keras",
"/home/yamada/public_html/colab/jena_dense_l.keras",
save_best_only=True,
)
]
model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])
history = model.fit(
train_dataset,
epochs=40,
verbose=0,
validation_data=val_dataset,
callbacks=callbacks_list)
loss = history.history["mae"] # 平均絶対誤差(MAE)
val_loss = history.history["val_mae"]
epochs = range(1, len(loss) + 1)
# plt.plot(epochs, loss, "bo", label="Training MAE")
# plt.plot(epochs, val_loss, "b", label="Validation MAE")
# plt.title("Training and Validation MAE")
# plt.legend()
# plt.show()
model = keras.models.load_model(
"/home/yamada/public_html/colab/jena_dense_l.keras")
pre = model.predict(test_dataset, verbose=0)
pre1 = np.reshape(pre, (-1))
# print(f"Test 予測値:")
# print(pre1)
future_test = inputs_t[-1:]
# print("future_test 最初の配列値")
# print(future_test)
future_result = []
for i in range(5):
test_data_f = np.reshape(future_test, (1, 20, 1))
batch_predict = model.predict(test_data_f, verbose=0)
future_test = np.delete(future_test, 0)
future_test = np.append(future_test, batch_predict)
future_result = np.append(future_result, batch_predict)
# print("future_result :")
# print(future_result)
# len_raw_data = len(raw_data)
len_raw_data = 100
# xx1 = np.arange(len_raw_data - 80, len_raw_data + 1)
xx1 = np.arange(sequence_length, len_raw_data + 1)
# print(xx1)
xx3 = np.arange(len_raw_data, len_raw_data + 5)
# print(xx3)
# plt.plot(xx1, pre)
# plt.plot(xx3, future_result)
# plt.show()
pre_chg = pre.copy()
pre_chg *= std_100
pre_chg += mean_100
pre_chg1 = np.reshape(pre_chg, (-1))
# print("pre_chg1:")
# print(pre_chg1)
f_result = future_result.copy()
f_result *= std_100
f_result += mean_100
# print("f_result:", f_result)
Idx_100 = Idx[-100:] - Idx[-100]
# print("Idx_100")
# print(Idx_100)
Close_100 = Close1[-100:]
# plt.plot(Idx_100, Close_100)
# plt.plot(xx1, pre_chg)
# plt.plot(xx3, f_result)
# plt.show()
Open_100 = Open[-100:]
High_100 = High[-100:]
Low_100 = Low[-100:]
ohlc = zip(
Idx_100, Open_100, High_100, Low_100, Close_100)
fig = plt.figure(
figsize=(8.34, 5.56))
# python スクリプトと Jupyter とでは、
# matplotlib の図のサイズが違うので注意
ax = fig.add_subplot(1,1,1)
ax.grid()
# 解析結果表示はここに挿入
plt.plot(xx1, pre_chg)
plt.plot(xx3, f_result, 'bo')
# ----------------
candlestick_ohlc(
ax, ohlc, width=0.5, alpha = 1,
colorup='r', colordown='g')
plt.xticks(xD, xDate)
plt.title('AUS$ / JPY chart')
plt.xlabel('Date')
plt.ylabel('Yen')
plt.savefig(
# 'colab_mane_chart_go_keras.png')
'/home/yamada/public_html/manep-img/mane_chart_go_keras_long.png')
# plt.show()
# print("予測値=最終日+1~+5日")
valhe = np.round(f_result, 3)#3桁まで表示
valhe_pd = pd.DataFrame(valhe)
# pandas concat 関数で横(列)方向へ連結する、axis=1 を忘れないこと
lastday_pd = pd.DataFrame(lastday[0])
df_concat = pd.concat([lastday_pd, valhe_pd], axis = 1)
# print(df_concat)
df_concat.to_csv(
'/home/yamada/public_html/manep-img/mane_chart_go_keras_long.csv',
header=False, index=False)
以上で変更作業は完了です。
引き続き
RNN 豪ドル/円予測Ⅲファイル keras-go-lnsk-4.py
を確認します。