Strategy Logic
This strategy aims to predict the price movements of the top low-volatility stocks in the S&P 500 index using a fully connected neural network model. The model takes as input a set of technical indicators, and it outputs predictions for price direction over a one-day interval. The primary goal is to determine which assets are likely to rise, and adjust portfolio weights accordingly.
Key Elements
- Input Features: The strategy uses several technical indicators such as ATR (Average True Range) devided by close price and multiple RSI (Relative Strength Index) normalized values with different time frames (7, 20, 60, 150) to capture different market conditions.
- Neural Network: A fully connected neural network with two hidden layers is used for predictions. The network outputs the likelihood of each asset's price rising.
- Target Classes: The strategy classifies future price movement into two categories: up or down, based on the closing price.
- Low-Volatility Asset Selection: The top 15 least volatile assets are selected daily to reduce risk and focus on stable assets.
Model Architecture The neural network model is defined with the following structure:
- Input Layer: Receives the features for the selected assets.
- Hidden Layers: Two layers with ReLU activation are used to capture complex relationships in the data.
- Output Layer: Produces probabilities for each of the top 15 assets, indicating the likelihood of a price increase.
Feature Engineering The following features are extracted from the asset price data:
- ATR Percentage: Measures volatility as a percentage of the closing price.
- Normalized RSI: RSI values for different periods (7, 20, 60, and 150) are normalized to better capture overbought or oversold conditions.
- Candlestick Patterns: Simple candlestick feature based on the open and close prices
Training and Prediction Training: The model is trained on historical data for the top 15 low-volatility assets, retrain interval is 90 days. The binary cross-entropy loss function is used to train the network over 100 epochs. Adam optimizer is applied for parameter updates.
Prediction: Once trained, the model predicts whether the price will rise or fall for each of the top assets. Based on the predictions, the weights of the portfolio are updated up to 0.1 which is maximum, according to probabilities of rising, if that probabiliti is higher than 70%.
Three types of exit conditions are used: take-profit, stop-loss and maximum holding period. The exits are defined as follows:
Take-Profit Exit: The strategy includes a take-profit mechanism, which triggers when the price of an asset has increased by a certain percentage from its open price. In this implementation, the take-profit level is set to 23% of the open price. This ensures that when the close price reaches a 23% gain, the position is exited to secure profits.
Stop-Loss Exit: To protect against significant losses, a stop-loss condition is applied. This stop-loss is set to 5% of the open price. If the asset's close price falls by 5% from the entry open price, the position is closed, preventing further losses beyond that threshold.
Maximum Holding Period: The strategy also enforces a maximum holding period of 217 days. This ensures that positions are not held indefinitely. If a position remains open for 217 days, it is automatically closed, even if the price has not triggered either the take-profit or stop-loss conditions.
Important! It is necessary to run the ./init.py file once to install the PyTorch dependency.
!pip install torch==2.4.1
import xarray as xr
import pandas as pd
import numpy as np
import random
import torch
from torch import nn, optim
import qnt.data as qndata
import qnt.output as qnout
import qnt.backtester as qnbt
import qnt.stats as qnstats
import qnt.graph as qngraph
import qnt.ta as qnta
import qnt.xr_talib as xr_talib
import qnt.state as qnstate
import qnt.exits as qnte
import qnt.filter as qnfilter
import qnt.exposure as qnexp
from torch.utils.data import DataLoader, TensorDataset
global_lookback_period = 450
global_train_period = 250
global_count_features_for_ml = 6
prediction_interval = 1
global_top_assets=15
# Define the neural network model
class FullyConnectedNN(nn.Module):
def __init__(self, input_dim):
super(FullyConnectedNN, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, global_top_assets),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
# Set seed for reproducibility
def set_seed(seed_value=42):
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def get_model():
set_seed(42)
input_dim = global_top_assets * global_count_features_for_ml
model = FullyConnectedNN(input_dim)
return model
def get_features(data):
close = data.sel(field="close").ffill('time').bfill('time').fillna(1)
high = data.sel(field="high").ffill('time').bfill('time').fillna(1)
low = data.sel(field="low").ffill('time').bfill('time').fillna(1)
open = data.sel(field="open").ffill('time').bfill('time').fillna(1)
previous_close = qnta.shift(close, 1)
atr = qnta.atr(high, low, close, 14).ffill('time').bfill('time').fillna(0)
atr_perc = (atr / close).ffill('time').bfill('time').fillna(0)
rsi = qnta.rsi(close, 7).ffill('time').bfill('time').fillna(0)
normalized_rsi7 = ((rsi - 50) * (7 ** 0.5) / 65).ffill('time').bfill('time').fillna(0)
rsi20 = qnta.rsi(close, 20).ffill('time').bfill('time').fillna(0)
normalized_rsi20 = ((rsi20 - 50) * (20 ** 0.5) / 65).ffill('time').bfill('time').fillna(0)
rsi60 = qnta.rsi(close, 60).ffill('time').bfill('time').fillna(0)
normalized_rsi60 = ((rsi60 - 50) * (60 ** 0.5) / 65).ffill('time').bfill('time').fillna(0)
rsi150 = qnta.rsi(close, 150).ffill('time').bfill('time').fillna(0)
normalized_rsi150 = ((rsi150 - 50) * (150 ** 0.5) / 65).ffill('time').bfill('time').fillna(0)
candle = ((close - open) / (close * 2 * np.maximum(atr, 0.01))).ffill('time').bfill('time').fillna(0)
features = xr.concat([ normalized_rsi7, normalized_rsi20, candle,normalized_rsi60,normalized_rsi150, atr_perc], "feature")
return features
def get_target_classes(data):
close_price = data.sel(field='close')
high_price = data.sel(field='high')
low_price = data.sel(field='low')
atr = qnta.atr(high_price, low_price, close_price, 14).ffill('time').bfill('time').fillna(0)
future_price = qnta.shift(close_price, -1)
threshold = close_price
class_positive = 1
class_negative = 0
target_price_up = xr.where(future_price > threshold, class_positive, class_negative)
return target_price_up
def get_top_low_volatility_stocks(data):
is_liquid = data.sel(field="is_liquid")
last_assets = is_liquid[-1]
is_liquid_asset_list = last_assets.where(last_assets > 0, drop=True).asset.values
data_liquid = data.sel(asset=is_liquid_asset_list)
rolling_window = min(global_lookback_period, len(data_liquid.time) - 1)
low_volatility = qnfilter.filter_volatility(
data=data_liquid,
rolling_window=rolling_window,
top_assets=global_top_assets,
metric="std",
ascending=True
)
last_asset = low_volatility[-1]
top_assets_indices = last_asset.where(last_asset > 0, drop=True).asset.values
data_all_dates = data.sel(asset=top_assets_indices)
return data_all_dates
# Load data
def load_data(period):
data = qndata.stocks.load_spx_data(tail=period)
return data
# Train the model
def train_model(data):
data_train = get_top_low_volatility_stocks(data)
features_all = get_features(data_train)
target_all = get_target_classes(data_train)
target_all, features_all = xr.align(target_all, features_all, join='inner')
feature_data = features_all.transpose('time', 'feature', 'asset').values.reshape(-1, global_top_assets * global_count_features_for_ml)
target_data = target_all.transpose('time', 'asset').values
feature_data = torch.tensor(feature_data, dtype=torch.float32)
target_data = torch.tensor(target_data, dtype=torch.float32)
model = get_model()
criterion = nn.BCELoss()
optimiser = optim.Adam(model.parameters(), lr=0.002)
epochs = 100
for epoch in range(epochs):
optimiser.zero_grad()
out = model(feature_data)
loss = criterion(out, target_data)
# print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")
loss.backward()
optimiser.step()
return model
def predict(model, data, state):
last_data_time = data.time.values[-1]
if state is None or state['weights'].time.values[-1] > last_data_time:
state = {
"weights": xr.zeros_like(data.sel(field='close')),
"holding_time": xr.zeros_like(data.isel(time=-1).asset, dtype=int),
"model": None,
"open_price": xr.full_like(data.isel(time=-1).asset, np.nan, dtype=int)
}
qnstate.write(state)
weights_prev = state['weights']
atr14 = qnta.atr(data.sel(field='high'), data.sel(field='low'), data.sel(field='close'), 14)
last_atr = atr14.isel(time=-1)
data_top_assets = get_top_low_volatility_stocks(data)
last_time = data_top_assets.time.values[-1]
features_last_day = get_features(data_top_assets).sel(time=data_top_assets.time[-1])
weights = xr.zeros_like(data.sel(field='close'))
weights_prev, weights = xr.align(weights_prev, weights, join='right')
# First,rewrite previous weights for all assets and dates
weights = xr.where(weights_prev > 0, weights_prev, weights)
# Rewrite previous day's weights for last day
weights.loc[dict(time=last_time)] = weights_prev.shift(time=1).loc[dict(time=last_time)]
# Prepare the features for prediction
feature_data = features_last_day.transpose('feature', 'asset').values.reshape(1, -1)
feature_data = torch.tensor(feature_data, dtype=torch.float32)
# Predict new weights using the model for the top low volatility assets
with torch.no_grad():
out = model(feature_data)
predictions = out.squeeze().numpy()
# Update the weights for the predicted top assets with the model's predictions
for idx, asset_name in enumerate(data_top_assets.asset.values):
if( predictions[idx] >0.7):
weights.loc[dict(asset=asset_name, time=last_time)] = predictions[idx]*0.1
weights = weights * data.sel(field="is_liquid")
signal_dc = qnte.max_hold_long(weights, state, max_period=217)
open_price=qnte.update_open_price(data, weights, state)
tpLong=qnte.take_profit_long_percentage(data, weights, open_price, percent=23)
slLong=qnte.stop_loss_long_percentage(data, weights, open_price, percent=5)
weights = weights* tpLong* slLong*signal_dc
weights_sum = abs(weights).sum('asset')
weights = xr.where(weights_sum> 1, weights / weights_sum, weights)
weights =weights.fillna(0)
state['weights'] = weights
state['model'] = model
return weights, state
# Backtesting
weights = qnbt.backtest_ml(
load_data=load_data,
train=train_model,
predict=predict,
train_period=global_train_period,
retrain_interval=90,
retrain_interval_after_submit=90,
predict_each_day=True,
competition_type='stocks_s&p500',
lookback_period=global_lookback_period,
start_date='2006-01-01',
build_plots=True
)
Run the last iteration...
0% (0 of 367973) | | Elapsed Time: 0:00:00 ETA: --:--:--
100% (367973 of 367973) |################| Elapsed Time: 0:00:00 Time: 0:00:00
0% (0 of 79197) | | Elapsed Time: 0:00:00 ETA: --:--:--
100% (79197 of 79197) |##################| Elapsed Time: 0:00:00 Time: 0:00:00
0% (0 of 10179900) | | Elapsed Time: 0:00:00 ETA: --:--:--
55% (5598945 of 10179900) |####### | Elapsed Time: 0:00:00 ETA: 00:00:00
100% (10179900 of 10179900) |############| Elapsed Time: 0:00:00 Time: 0:00:00
fetched chunk 1/1 9s
Data loaded 9s
State loaded.
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) Cell In[1], line 251 247 return weights, state 250 # Backtesting --> 251 weights = qnbt.backtest_ml( 252 load_data=load_data, 253 train=train_model, 254 predict=predict, 255 train_period=global_train_period, 256 retrain_interval=90, 257 retrain_interval_after_submit=90, 258 predict_each_day=True, 259 competition_type='stocks_s&p500', 260 lookback_period=global_lookback_period, 261 start_date='2006-01-01', 262 build_plots=True 263 ) File /usr/local/lib/python3.10/site-packages/qnt/backtester.py:166, in backtest_ml(train, predict, train_period, retrain_interval, predict_each_day, retrain_interval_after_submit, competition_type, load_data, lookback_period, test_period, start_date, end_date, window, analyze, build_plots, collect_all_states, check_correlation) 163 created = data_ts[-1] 165 test_data_slice = copy_window(data, data_ts[-1], lookback_period) --> 166 output = predict_wrap(model, test_data_slice, state) 167 output, state = unpack_result(output) 169 if data_ts[-1] in output.time: Cell In[1], line 233, in predict(model, data, state) 229 weights.loc[dict(asset=asset_name, time=last_time)] = predictions[idx]*0.1 231 weights = weights * data.sel(field="is_liquid") --> 233 signal_dc = qnte.max_hold_long(weights, state, max_period=217) 234 open_price=qnte.update_open_price(data, weights, state) 235 tpLong=qnte.take_profit_long_percentage(data, weights, open_price, percent=23) File /usr/local/lib/python3.10/site-packages/qnt/exits.py:98, in max_hold_long(weights, state, max_period) 94 holding_time_prev = state['holding_time'] 96 reset_or_increase = xr.where(holding_time_prev >= max_period, 0, holding_time_prev + 1) ---> 98 holding_time = xr.where(curr_pos > 0, reset_or_increase, holding_time_prev) 100 holding_time = xr.where(curr_pos != 0, holding_time, 0) 101 holding_time = xr.where(curr_pos * prev_pos < 0, 1, holding_time) File /usr/local/lib/python3.10/site-packages/xarray/core/computation.py:2013, in where(cond, x, y, keep_attrs) 2010 keep_attrs = _get_keep_attrs(default=False) 2012 # alignment for three arguments is complicated, so don't support it yet -> 2013 result = apply_ufunc( 2014 duck_array_ops.where, 2015 cond, 2016 x, 2017 y, 2018 join="exact", 2019 dataset_join="exact", 2020 dask="allowed", 2021 keep_attrs=keep_attrs, 2022 ) 2024 # keep the attributes of x, the second parameter, by default to 2025 # be consistent with the `where` method of `DataArray` and `Dataset` 2026 # rebuild the attrs from x at each level of the output, which could be 2027 # Dataset, DataArray, or Variable, and also handle coords 2028 if keep_attrs is True and hasattr(result, "attrs"): File /usr/local/lib/python3.10/site-packages/xarray/core/computation.py:1266, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, on_missing_core_dim, *args) 1264 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc 1265 elif any(isinstance(a, DataArray) for a in args): -> 1266 return apply_dataarray_vfunc( 1267 variables_vfunc, 1268 *args, 1269 signature=signature, 1270 join=join, 1271 exclude_dims=exclude_dims, 1272 keep_attrs=keep_attrs, 1273 ) 1274 # feed Variables directly through apply_variable_ufunc 1275 elif any(isinstance(a, Variable) for a in args): File /usr/local/lib/python3.10/site-packages/xarray/core/computation.py:291, in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, *args) 287 from xarray.core.dataarray import DataArray 289 if len(args) > 1: 290 args = tuple( --> 291 deep_align( 292 args, 293 join=join, 294 copy=False, 295 exclude=exclude_dims, 296 raise_on_invalid=False, 297 ) 298 ) 300 objs = _all_of_type(args, DataArray) 302 if keep_attrs == "drop": File /usr/local/lib/python3.10/site-packages/xarray/core/alignment.py:946, in deep_align(objects, join, copy, indexes, exclude, raise_on_invalid, fill_value) 943 else: 944 out.append(variables) --> 946 aligned = align( 947 *targets, 948 join=join, 949 copy=copy, 950 indexes=indexes, 951 exclude=exclude, 952 fill_value=fill_value, 953 ) 955 for position, key, aligned_obj in zip(positions, keys, aligned): 956 if key is no_key: File /usr/local/lib/python3.10/site-packages/xarray/core/alignment.py:882, in align(join, copy, indexes, exclude, fill_value, *objects) 686 """ 687 Given any number of Dataset and/or DataArray objects, returns new 688 objects with aligned indexes and dimension sizes. (...) 872 873 """ 874 aligner = Aligner( 875 objects, 876 join=join, (...) 880 fill_value=fill_value, 881 ) --> 882 aligner.align() 883 return aligner.results File /usr/local/lib/python3.10/site-packages/xarray/core/alignment.py:574, in Aligner.align(self) 572 self.find_matching_unindexed_dims() 573 self.assert_no_index_conflict() --> 574 self.align_indexes() 575 self.assert_unindexed_dim_sizes_equal() 577 if self.join == "override": File /usr/local/lib/python3.10/site-packages/xarray/core/alignment.py:421, in Aligner.align_indexes(self) 419 if need_reindex: 420 if self.join == "exact": --> 421 raise ValueError( 422 "cannot align objects with join='exact' where " 423 "index/labels/sizes are not equal along " 424 "these coordinates (dimensions): " 425 + ", ".join(f"{name!r} {dims!r}" for name, dims in key[0]) 426 ) 427 joiner = self._get_index_joiner(index_cls) 428 joined_index = joiner(matching_indexes) ValueError: cannot align objects with join='exact' where index/labels/sizes are not equal along these coordinates (dimensions): 'asset' ('asset',)