admin管理员组文章数量:1208153
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import pandas as pd
import yfinance as yf
import numpy as np
import asyncio
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import os
from sklearn.utils import class_weight
from tensorflow.keras.callbacks import EarlyStopping
from imblearn.over_sampling import SMOTE
from sklearn.inspection import permutation_importance
import pandas_ta as ta
from tensorflow.keras.regularizers import l2
from imblearn.under_sampling import RandomUnderSampler
ticker = "DOGE-USD"
SUPPORT_RESISTANCE_WINDOW = 24
features = ['RSI', 'MACD', 'Signal_Line', 'Deviation_SMA_20', 'Deviation_SMA_50', 'Price_Change',
'MACD_Ratio', 'ATR', 'Volume_Change', 'Volume_to_Mean', 'Close_to_SMA', 'High-Low',
'ADX', 'VWAP', 'Log_Volume', 'Close_to_Resistance', 'Close_to_Support',
'Hour', 'DayOfWeek', 'Volume_to_Range', 'Close_to_VWAP']
input_shape = len(features)
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(len(features), )),
tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=l2(0.01)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='sigmoid')
])
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
modelpile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy', 'AUC'])
early_stopping = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
weights_file = "market_direction_model.weights.h5"
if os.path.exists(weights_file):
try:
print("Загружаем сохранённые веса модели...")
model.load_weights(weights_file)
except ValueError as e:
print(f"Ошибка загрузки весов: {e}. Старые веса удалены. Модель будет обучена с нуля.")
os.remove(weights_file)
else:
print("Сохранённые веса не найдены. Модель будет обучена с нуля.")
def calculate_indicators(data):
data.columns = data.columns.get_level_values(0)
def calculate_rsi(data, window=14):
delta = data['Close'].diff(1)
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=window, min_periods=1).mean()
avg_loss = loss.rolling(window=window, min_periods=1).mean()
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
data['RSI'] = calculate_rsi(data)
short_ema = data['Close'].ewm(span=9, adjust=False).mean()
long_ema = data['Close'].ewm(span=21, adjust=False).mean()
data['MACD'] = short_ema - long_ema
data['Signal_Line'] = data['MACD'].ewm(span=9, adjust=False).mean()
data['SMA_20'] = data['Close'].rolling(window=20).mean()
data['SMA_50'] = data['Close'].rolling(window=50).mean()
rolling_std = data['Close'].rolling(window=20).std()
data['BB_Upper'] = data['SMA_20'] + (2 * rolling_std)
data['BB_Lower'] = data['SMA_20'] - (2 * rolling_std)
data['High-Low'] = data['High'] - data['Low']
data['High-Close'] = abs(data['High'] - data['Close'].shift(1))
data['Low-Close'] = abs(data['Low'] - data['Close'].shift(1))
data['True_Range'] = data[['High-Low', 'High-Close', 'Low-Close']].max(axis=1)
data['ATR'] = data['True_Range'].rolling(window=14).mean()
data['Deviation_SMA_20'] = data['Close'] - data['SMA_20']
data['Deviation_SMA_50'] = data['Close'] - data['SMA_50']
data['Price_Change'] = (data['Close'] - data['Open']) / data['Open']
data['MACD_Ratio'] = data['MACD'] / (data['Signal_Line'] + 1e-10)
data['Volume_Change'] = data['Volume'].pct_change()
data['Volume_to_Mean'] = data['Volume'] / data['Volume'].rolling(window=20).mean()
data['Close_to_SMA'] = data['Close'] / data['SMA_20']
data['ADX'] = ta.adx(data['High'], data['Low'], data['Close'], window=14)['ADX_14']
data['VWAP'] = ta.vwap(data['High'], data['Low'], data['Close'], data['Volume'])
return data
def generate_signals(data):
data = data.dropna().copy()
data['Target'] = (data['Close'].shift(-1) > data['Close']).astype(int) # Целевая переменная: 1, если цена выросла
return data
async def fetch_data():
data = yf.download(ticker, period="1y", interval="1h")
data = data.reset_index()
return data
def clean_data(data):
# Заменить бесконечные значения на NaN
data.replace([np.inf, -np.inf], np.nan, inplace=True)
# Удалить строки с NaN
data.dropna(inplace=True)
return data
print("Загружаем данные для обучения...")
data = asyncio.run(fetch_data())
data = data.set_index('Datetime')
data = calculate_indicators(data)
data = generate_signals(data)
data = clean_data(data)
print(data['Target'].value_counts())
print(data['Target'].value_counts(normalize=True))
calculated_features = [
'RSI', 'MACD', 'Signal_Line', 'Deviation_SMA_20', 'Deviation_SMA_50',
'Price_Change', 'MACD_Ratio', 'ATR', 'Volume_Change', 'Volume_to_Mean',
'Close_to_SMA', 'High-Low', 'ADX', 'VWAP'
]
missing_features = [f for f in calculated_features if f not in data.columns]
if missing_features:
print(f"Следующие признаки отсутствуют после расчёта: {missing_features}")
raise KeyError(f"Отсутствуют признаки: {missing_features}")
features = [f for f in calculated_features if f in data.columns]
X = data[features].values
y = data['Target'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
data_with_target = data.copy()
data_with_target['Target'] = y
correlation = data[features + ['Target']].corr()['Target'].abs()
threshold = 0.01
filtered_features = correlation[correlation > threshold].index.tolist()
if 'Target' in filtered_features:
filtered_features.remove('Target')
features = filtered_features
print(f"Оставшиеся признаки: {features}")
if isinstance(correlation, pd.Series):
print(correlation.sort_values(ascending=False))
else:
print("Корреляция:", correlation)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
class_weights = class_weightpute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = dict(enumerate(class_weights))
def apply_undersampling(X, y):
rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X, y)
return X_resampled, y_resampled
X_train_resampled, y_train_resampled = apply_undersampling(X_train_scaled, y_train)
print(f"Размер данных после Undersampling: {X_train_resampled.shape}, {y_train_resampled.shape}")
print(pd.DataFrame(X_train_scaled).describe())
best_accuracy = 0
while best_accuracy < 0.6: # Условие: точность должна быть больше 75%
print("Обучение модели...")
history = model.fit(
X_train_resampled, y_train_resampled,
validation_data=(X_test_scaled, y_test),
epochs=100, # Сокращение числа эпох для стабильности
batch_size=32,
verbose=1,
class_weight=class_weights, # Взвешивание классов
callbacks=[early_stopping] # Добавление ранней остановки
)
results = model.evaluate(X_test_scaled, y_test, verbose=1)
loss = results[0]
accuracy = results[1]
auc = results[2] # Если третья метрика — AUC
print(f"Точность на тестовом наборе: {accuracy:.4f}, Потери: {loss:.4f}, AUC: {auc:.4f}")
if accuracy > best_accuracy:
best_accuracy = accuracy
model.save_weights(weights_file)
print(f"Новая лучшая точность: {best_accuracy:.4f}. Веса модели сохранены.")
print("Модель успешно обучена.")
def predict_direction(data):
features = data[features].iloc[-1:].values
features_scaled = scaler.transform(features)
prediction = model.predict(features_scaled)
return "Вверх" if prediction[0][0] > 0.5 else "Вниз"
async def update(frame):
global data
print("Обновляем данные...")
new_data = await fetch_data()
new_data = new_data.set_index('Datetime')
data = calculate_indicators(new_data)
data = generate_signals(data)
market_direction = predict_direction(data)
ax.clear()
ax.plot(data.index, data['Close'], label=f"Цена {ticker}", color="blue", linewidth=2.5)
ax.plot(data.index, data['BB_Upper'], label="Верхняя Bollinger Band", color="green", linestyle="--")
ax.plot(data.index, data['BB_Lower'], label="Нижняя Bollinger Band", color="red", linestyle="--")
ax.plot(data.index, data['SMA_20'], label="SMA 20", color="purple", linestyle="--")
ax.plot(data.index, data['SMA_50'], label="SMA 50", color="yellow", linestyle="--")
ax.text(0.02, 0.95, f"Прогноз: {market_direction}", transform=ax.transAxes,
fontsize=12, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
ax.set_title(f"Прогноз для {ticker} (1 час)")
ax.set_xlabel("Дата")
ax.set_ylabel("Цена (USD)")
ax.legend(loc='upper left')
ax.grid()
fig, ax = plt.subplots(figsize=(12, 8))
ani = FuncAnimation(fig, lambda frame: asyncio.run(update(frame)), interval=3600000, cache_frame_data=False)
plt.tight_layout()
plt.plot(history.history['loss'], label='Loss')
plt.plot(history.history['accuracy'], label='Accuracy')
plt.plot(history.history['auc'], label='AUC')
plt.legend()
plt.show()
After a huge number of eras, the choice is still at the level of randomness, how can we prevent it?
I tried to increase the number of features, but with the help of correlation I removed the unnecessary ones, I also tried to decrease the number of neurons, changed the optimizer, increased the timeframe to get a larger dataset
本文标签:
版权声明:本文标题:tensorflow2.0 - Implementation of artificial intelligence in Python to analyze the market and further forecasts for ups and down 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1738761833a2111007.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论