Win11 WSL2 Ubuntu Python RNN
WSL2 Python RNN 予測ファイルまとめ
ここまで、WSL2 Python RNN 予測ファイルについて小出しにその詳細を説明してきました。
引き続き、これを一つのファイル keras-go-long-4.py にまとめます。
Python Colaboratory Ⅱ--RNN 豪ドル/円予測ファイルⅡ
が元ファイルになります。
RNN 豪ドル/円予測ファイルⅡ keras-go-2.ipynb との差分は赤字で表示します。
import os # 1:INFOメッセージが出ないようにする os.environ['TF_CPP_MIN_LOG_LEVEL']='1' import datetime from datetime import datetime as dt import numpy as np import pandas as pd from matplotlib import pyplot as plt from tensorflow import keras from mplfinance.original_flavor import candlestick_ohlc def addBusinessDays(from_date, add_days): cur_date8 = [] # 21/03/30 cur_date4 = [] # 3/30 business_days_to_add = add_days current_date = from_date while business_days_to_add > 0: current_date += datetime.timedelta(days=1) weekday = current_date.weekday() if weekday >= 5: # sunday = 6 continue #土日は以下はスキップ # 年/月/日として文字列にする #例 21/03/30 e11 = current_date.strftime("%y/%m/%d") e22 = e11.replace("/0", "/") e33 = e22[3:] # 21/ 削除 cur_date8.append(e22) cur_date4.append(e33) business_days_to_add -= 1 return cur_date8, cur_date4 xl_df = pd.read_csv( "/home/yamada/public_html/manep-w/mane_chart_go_long.csv", encoding="cp932") Open = xl_df["始値(売り)"].values High = xl_df["高値(売り)"].values Low = xl_df["安値(売り)"].values Close = xl_df["終値(売り)"].values Date = xl_df["日付"].values Idx = xl_df.index tstr = Date[-1] raw_data0 = Close.copy() raw_data = Close.copy() Close1 = Close.copy() tdatetime = dt.strptime(tstr, '%Y/%m/%d %H:%M:%S') # 文字列をdatetimeに変換するのがstrptime()関数 # datetime.datetime.strptime(文字列, 書式指定文字列) lastday = addBusinessDays(tdatetime, 5) Date_100 = Date[-100:] xDate = [] xD = [] for i, key in enumerate(Date_100): if(i % 10 == 0): e4 = str(key)[4:10] e6 = e4.replace("/0", "/") e8 = e6.lstrip("/") xDate.append(e8) xD.append(i) xDate.append(lastday[1][4]) xD.append(i + 5) #5日間の予測日の追加 raw_data0 = xl_df["終値(売り)"].values print("raw_data0.shape:", raw_data0.shape) print("raw_data0") print(raw_data0) # plt.plot(range(len(raw_data0)), raw_data0) # plt.show() raw_data = raw_data0.copy() # 行列の平均、標準偏差を求めます。 mean = np.mean(raw_data) print("Mean", mean) raw_data -= mean std = np.std(raw_data) print("Std ",std) # 標準偏差値に変換 raw_data /= std print("各標準偏差値") print(raw_data) raw_data_100 = raw_data0[-100:] print("raw_data_100") print(raw_data_100) # ラスト100個の行列の平均、標準偏差を求めます。 mean_100 = np.mean(raw_data_100) print("Mean_100", mean_100) raw_data_100 -= mean_100 std_100 = np.std(raw_data_100) print("Std_100 ",std_100) # 標準偏差値に変換 raw_data_100 /= std_100 print("テストデータ各標準偏差値 std_100") print(raw_data_100) # 連続デ-タとする。一つおきは、2 sampling_rate = 1 # 過去20間隔デ-タをひとまとまりとして時系列予測する sequence_length = 20 delay = sampling_rate * sequence_length # print("delay:", delay) batch_size = 32 # 適当 # 検証デ-タのスタ-ト値 # num_half_samples = int(0.5 * len(raw_data)) train_dataset = keras.utils.timeseries_dataset_from_array( raw_data, targets=raw_data[delay:], sampling_rate=sampling_rate, sequence_length=sequence_length, batch_size=batch_size, ) val_dataset = keras.utils.timeseries_dataset_from_array( # raw_data[:-1], raw_data_100, targets=raw_data_100[delay:], sampling_rate=sampling_rate, sequence_length=sequence_length, batch_size=batch_size, # start_index=num_half_samples, ) test_dataset = keras.utils.timeseries_dataset_from_array( raw_data_100, targets=None, sampling_rate=sampling_rate, sequence_length=sequence_length, batch_size=batch_size, ) # numpy ndarray 配列に変換して表示 # 訓練デ-タセット表示 itr = 0 for samples, targets in train_dataset: samples_n = samples.numpy() targets_n = targets.numpy() if itr == 0: print("Start in-train:", samples_n[0]) print("Start tar-train:", targets_n[0]) itr = itr + 1 print("End in-train:", samples_n[-1]) print("End tar-train:", targets_n[-1]) # 検証デ-タセット表示 itv = 0 for samples_v, targets_v in val_dataset: samples_vn = samples_v.numpy() targets_vn = targets_v.numpy() if itv == 0: print("Start in-val:", samples_vn[0]) print("Start tar-val:", targets_vn[0]) itv = itv + 1 print("End in-val:", samples_vn[-1]) print("End tar-val:", targets_vn[-1]) # テストデ-タセット表示 i = 0 for inputs_t in test_dataset: inputs_n = inputs_t.numpy() if i == 0: print("Start test:", inputs_n[0]) i = i + 1 print("End test:", inputs_n[-1]) from keras import layers from keras import initializers sequence_length = 20 inputs = keras.Input(shape=(sequence_length,)) x = layers.Flatten()(inputs) x = layers.Dense( 20, activation="tanh", kernel_initializer='zeros' )(x) outputs = layers.Dense(1)(x) model = keras.Model(inputs, outputs) print("モデルア-キテクチャ") print(model.summary()) callbacks_list = [ keras.callbacks.EarlyStopping( monitor="val_loss", patience=2, ), keras.callbacks.ModelCheckpoint( "/home/yamada/public_html/colab/jena_dense_l.keras", save_best_only=True, ) ] model.compile(optimizer="rmsprop", loss="mse", metrics=["mae"]) history = model.fit( train_dataset, epochs=40, # verbose=0, validation_data=val_dataset, callbacks=callbacks_list) loss = history.history["mae"] # 平均絶対誤差(MAE) val_loss = history.history["val_mae"] epochs = range(1, len(loss) + 1) plt.plot(epochs, loss, "bo", label="Training MAE") plt.plot(epochs, val_loss, "b", label="Validation MAE") plt.title("Training and Validation MAE") plt.legend() plt.show() model = keras.models.load_model( "/home/yamada/public_html/colab/jena_dense_l.keras") pre = model.predict(test_dataset, verbose=0) # verbose=0:進捗状況報告なし pre1 = np.reshape(pre, (-1)) print(f"Test 予測値:") print(pre1) future_test = inputs_t[-1:] print("future_test 最初の配列値") print(future_test) future_result = [] for i in range(5): test_data_f = np.reshape(future_test, (1, 20, 1)) batch_predict = model.predict(test_data_f) future_test = np.delete(future_test, 0) future_test = np.append(future_test, batch_predict) future_result = np.append(future_result, batch_predict) # print("future_result :") # print(future_result) # len_raw_data = len(raw_data) len_raw_data = 100 xx1 = np.arange(sequence_length, len_raw_data + 1) print(xx1) xx3 = np.arange(len_raw_data, len_raw_data + 5) print(xx3) plt.plot(xx1, pre) plt.plot(xx3, future_result) plt.show() pre_chg = pre.copy() pre_chg *= std_100 pre_chg += mean_100 pre_chg1 = np.reshape(pre_chg, (-1)) # print("pre_chg1:") # print(pre_chg1) f_result = future_result.copy() f_result *= std_100 f_result += mean_100 # print("f_result:", f_result) Idx_100 = Idx[-100:] - Idx[-100] print("Idx_100") print(Idx_100) Close_100 = Close[-100:] plt.plot(Idx_100, Close_100) plt.plot(xx1, pre_chg) plt.plot(xx3, f_result) plt.show() Open_100 = Open[-100:] High_100 = High[-100:] Low_100 = Low[-100:] ohlc = zip( Idx_100, Open_100, High_100, Low_100, Close_100) fig = plt.figure( figsize=(8.34, 5.56)) # python スクリプトと Jupyter とでは、 # matplotlib の図のサイズが違うので注意 ax = fig.add_subplot(1,1,1) ax.grid() # 解析結果表示はここに挿入 plt.plot(xx1, pre_chg) plt.plot(xx3, f_result, 'bo') candlestick_ohlc( ax, ohlc, width=0.5, alpha = 1, colorup='r', colordown='g') plt.xticks(xD, xDate) plt.title('AUS$ / JPY chart') plt.xlabel('Date') plt.ylabel('Yen') # plt.savefig( # '/home/yamada/public_html/manep-img/mane_chart_go_keras_long.png') plt.show() print("予測値=最終日+1~+5日") valhe = np.round(f_result, 3)#3桁まで表示 valhe_pd = pd.DataFrame(valhe) # pandas concat 関数で横(列)方向へ連結する、axis=1 を忘れないこと lastday_pd = pd.DataFrame(lastday[0]) df_concat = pd.concat([lastday_pd, valhe_pd], axis = 1) print(df_concat) df_concat.to_csv( '/home/yamada/public_html/manep-img/mane_chart_go_keras_long.csv', header=False, index=False)
ここまでで WSL2 Python RNN 予測ファイル keras-go-long-4.py がまとまりました。
引き続き、このファイルの動作確認をします。