Machine Learning - LSTM - State¶
This example shows how to use neural networks for writing a trading system on stocks using state
You can clone and edit this example there (tab Examples).
This example uses a Long Short Term Memory (LSTM) Neural Network to predict if stock prices will rise or fall. We trade the top assets with the lowest volatility and use previous weights to determine positions for the next day.
Important! Before further development, you need to run the ./init.py file once to install the PyTorch dependency.
!pip install torch==2.2.1
Key Features:¶
- Universe: NASDAQ-100 stocks
- Trading Logic: Long positions are taken based on the confidence level of the LSTM model’s predictions.
- Feature for Learning: Trend calculated using the rate of change (ROC) of the logarithm of the closing price with a linear weighted moving average (LWMA).
import xarray as xr
import qnt.data as qndata
import qnt.backtester as qnbt
import qnt.ta as qnta
import qnt.stats as qns
import qnt.graph as qngraph
import qnt.output as qnout
import qnt.filter as qnfilter
import qnt.exposure as qnexp
import numpy as np
import pandas as pd
import torch
from torch import nn, optim
import random
global_lookback_period = 155
global_train_period = 100
global_count_features_for_ml = 1
global_top_assets = 12
class LSTM(nn.Module):
"""
Class to define our LSTM network.
"""
def __init__(self, input_dim=3, hidden_layers=64):
super(LSTM, self).__init__()
self.hidden_layers = hidden_layers
self.lstm1 = nn.LSTMCell(input_dim, self.hidden_layers)
self.lstm2 = nn.LSTMCell(self.hidden_layers, self.hidden_layers)
self.linear = nn.Linear(self.hidden_layers, 1)
def forward(self, y):
outputs = []
n_samples = y.size(0)
h_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
h_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
c_t2 = torch.zeros(n_samples, self.hidden_layers, dtype=torch.float32)
for time_step in range(y.size(1)):
x_t = y[:, time_step, :] # Ensure x_t is [batch, input_dim]
h_t, c_t = self.lstm1(x_t, (h_t, c_t))
h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
output = self.linear(h_t2)
outputs.append(output.unsqueeze(1))
outputs = torch.cat(outputs, dim=1).squeeze(-1)
return outputs
def get_model():
def set_seed(seed_value=42):
"""Set seed for reproducibility."""
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value) # if you are using multi-GPU.
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
model = LSTM(input_dim=global_count_features_for_ml, hidden_layers=34)
return model
def get_features(data):
close_price = data.sel(field="close").ffill('time').bfill('time').fillna(1)
close_price_lwma = qnta.lwma(close_price, 10)
trend = qnta.roc(close_price_lwma, 1).ffill('time').bfill('time').fillna(1)
features = xr.concat([trend], "feature")
return features
def get_target_classes(data):
price_current = data.sel(field='open')
price_future = qnta.shift(price_current, -1)
class_positive = 1 # prices goes up
class_negative = 0 # price goes down
target_price_up = xr.where(price_future > price_current, class_positive, class_negative)
return target_price_up
def get_dynamic_stock_data(data):
is_liquid = data.sel(field="is_liquid")
# Get the assets that are liquid in the last time step
last_assets = is_liquid[-1]
is_liquid_asset_list = last_assets.where(last_assets > 0, drop=True).asset.values
# Select data for liquid assets
data_liquid = data.sel(asset=is_liquid_asset_list)
# Determine the rolling window for volatility calculation
rolling_window = min(global_lookback_period, len(data_liquid.time) - 1)
# Filter the top assets by lowest volatility
low_volatility = qnfilter.filter_volatility(
data=data_liquid,
rolling_window=rolling_window,
top_assets=global_top_assets,
metric="std",
ascending=True
)
# Select data for the low volatility assets
last_asset = low_volatility[-1]
top_assets_indices = last_asset.where(last_asset > 0, drop=True).asset.values
data_low_volatility_assets = data.sel(asset=top_assets_indices)
return data_low_volatility_assets
def load_data(period):
data = qndata.stocks.load_ndx_data(tail=period)
return data
def train_model(data):
data = get_dynamic_stock_data(data)
features_all = get_features(data)
target_all = get_target_classes(data)
models = dict()
asset_name_all = data.asset.values
for asset_name in asset_name_all:
model = get_model()
target_cur = target_all.sel(asset=asset_name).dropna('time', 'any')
features_cur = features_all.sel(asset=asset_name).dropna('time', 'any')
target_for_learn_df, feature_for_learn_df = xr.align(target_cur, features_cur, join='inner')
criterion = nn.MSELoss()
optimiser = optim.LBFGS(model.parameters(), lr=0.08)
epochs = 1
for i in range(epochs):
def closure():
optimiser.zero_grad()
feature_data = feature_for_learn_df.transpose('time', 'feature').values
in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0)
out = model(in_)
target = torch.zeros(1, len(target_for_learn_df.values))
target[0, :] = torch.tensor(np.array(target_for_learn_df.values))
loss = criterion(out, target)
loss.backward()
return loss
optimiser.step(closure)
models[asset_name] = model
return models
def predict(models, data, state):
models_name = list(models.keys())
data = data.sel(asset=models_name)
features_all = get_features(data)
last_time = data.time.values[-1]
data_last = data.sel(time=slice(last_time, None))
features_last = features_all.sel(time=slice(last_time, None))
weights = xr.zeros_like(data_last.sel(field='close'))
for asset_name in models_name:
features_cur = features_last.sel(asset=asset_name).dropna('time', 'any')
if len(features_cur.time) < 1:
continue
feature_data = features_cur.transpose('time', 'feature').values
in_ = torch.tensor(feature_data, dtype=torch.float32).unsqueeze(0)
out = models[asset_name](in_)
prediction = out.detach()[0]
weights.loc[dict(asset=asset_name, time=features_cur.time.values)] = prediction
# Apply liquidity filter
weights = weights * data_last.sel(field="is_liquid")
# state may be null, so define a default value
if state is None:
default = xr.zeros_like(data_last.sel(field='close')).isel(time=-1)
state = {
"previous_weights": default,
}
# Extract previous weights from state
if isinstance(state, tuple):
previous_weights = state[-1]['previous_weights']
# created = state[0]
# model = state[1]
# state = state[2]
else:
previous_weights = state['previous_weights']
# Combine the assets from both previous_weights and weights
# This code is needed because the list of liquid assets may change
all_assets = np.union1d(previous_weights.asset.values, weights.asset.values)
# Include new assets from weights in previous_weights, set to 0 if not present
weights = weights.reindex(asset=all_assets, fill_value=0)
previous_weights = previous_weights.reindex(asset=all_assets, fill_value=0)
# Ensure previous_weights includes the last time from weights
if last_time != previous_weights.time.values:
previous_weights = xr.concat([previous_weights, weights], dim='time')
previous_weights = previous_weights.reindex(asset=all_assets, fill_value=0)
# Shift previous weights by one time step to treat the past date as the current date, filling NaN values with 0
previous_weights = previous_weights.shift(time=1).fillna(0)
previous_weights = previous_weights.sel(time=last_time)
# Calculate average weights
weights_avg = xr.where(previous_weights == 0, weights, (weights + previous_weights) / 2)
weights_avg = xr.where(weights_avg < 0.005, 0, weights_avg)
# Normalize positions and cut big positions
weights_sum = abs(weights_avg).sum('asset')
weights_avg = xr.where(weights_sum > 1, weights_avg / weights_sum, weights_avg)
weights_avg = qnexp.cut_big_positions(weights=weights_avg, max_weight=0.1)
next_state = {
"previous_weights": weights_avg.isel(time=-1),
}
#
# print(last_time)
# print("previous_weights")
# print(previous_weights)
# print(weights)
# print("weights_avg")
# print(weights_avg.isel(time=-1))
return weights_avg, next_state
Multi-pass Version for Development and Testing Strategy¶
For a quick model check, we recommend starting with a short time period. Comment out this code before submitting the strategy to the competition.
weights = qnbt.backtest_ml(
load_data=load_data,
train=train_model,
predict=predict,
train_period=global_train_period,
retrain_interval=180,
retrain_interval_after_submit=1,
predict_each_day=True,
competition_type='stocks_nasdaq100',
lookback_period=global_lookback_period,
start_date='2020-12-30',
build_plots=True
)
Single-pass Version for Participation in the Contest¶
This code helps submissions get processed faster in the contest. The backtest system calculates the weights for each day, while the provided function calculates weights for only one day.
import gzip
import pickle
from qnt.data import get_env
from qnt.log import log_err, log_info
def state_write(state, path=None):
if path is None:
path = get_env("OUT_STATE_PATH", "state.out.pickle.gz")
try:
with gzip.open(path, 'wb') as gz:
pickle.dump(state, gz)
log_info("State saved: " + str(state))
except Exception as e:
log_err(f"Error saving state: {e}")
def state_read(path=None):
if path is None:
path = get_env("OUT_STATE_PATH", "state.out.pickle.gz")
try:
with gzip.open(path, 'rb') as gz:
state = pickle.load(gz)
log_info("State loaded.")
return state
except Exception as e:
log_err(f"Can't load state: {e}")
return None
state = state_read()
print(state)
# separate cell
def print_stats(data, weights):
stats = qns.calc_stat(data, weights)
display(stats.to_pandas().tail())
performance = stats.to_pandas()["equity"]
qngraph.make_plot_filled(performance.index, performance, name="PnL (Equity)", type="log")
data_train = load_data(global_train_period)
models = train_model(data_train)
data_predict = load_data(global_lookback_period)
last_time = data_predict.time.values[-1]
if last_time < np.datetime64('2006-01-02'):
print("The first state should be None")
state_write(None)
state = state_read()
print(state)
weights_predict, state_new = predict(models, data_predict, state)
print_stats(data_predict, weights_predict)
state_write(state_new)
print(state_new)
qnout.write(weights_predict) # To participate in the competition, save this code in a separate cell.
Machine Learning Model Strategy for Competitive Submissions
To enhance your machine learning-based strategy for competitive submissions, consider the following guidelines tailored for efficiency and robustness:
Model Retraining Frequency¶
- Your configuration to retrain the model daily (
retrain_interval_after_submit=1
) after competition submission is noted. For a more streamlined approach, adjust your strategy to a single-pass mode, conducive to the competition's environment. Utilize the available precheck feature for a preliminary quality assessment of your model.
Acceleration Techniques¶
To expedite the development process, you might explore:
- Model Simplification: Opt for less complex machine learning models to reduce computational demands.
- Local Development Enhancements: Utilize a high-performance computer locally or deploy your script on a potent server for accelerated computations.
- Data Volume Reduction: Limit the dataset size to hasten model training and evaluation.
- Condensed Testing Phases: Shorten the evaluation timeframe by focusing on recent performance metrics, such as examining the model's financial outcomes over the past year.
Data Preparation and Feature Engineering¶
- Pre-calculated Indicators: Employ pre-calculated technical indicators like Exponential Moving Averages (EMA) to enrich your features without the risk of lookahead bias. Example:
g_ema = qnta.ema(data_all.sel(field="high"), 15)
ensures indicators are prepared ahead of the model training phase.