This template demonstrates how to find optimal parameters for each asset.
It allows you to develop a strategy which will pass the filters easily.
When you start writing your strategies, the easiest way is to use technical analysis. But pure technical analysis, probably, won't work. You have to adjust parameters of technical indicators each asset.
However, there is a big risk to create an overfitting strategy, when you use such an optimization. There is way how to reduce the impact of overfitting. Instead of using the one optimal parameter set per asset, you can use several paremeters each asset. This example demonstrates how to do it.
Base strategy
%%javascript
window.IPython && (IPython.OutputArea.prototype._should_scroll = function(lines) { return false; })
// disable widget scrolling
import json
import xarray as xr
import numpy as np
import pandas as pd
import qnt.data as qndata # data loading and manipulation
import qnt.stats as qnstats # key statistics
import qnt.graph as qngraph # graphical tools
import qnt.ta as qnta # technical analysis indicators
import qnt.output as qnout # for writing output
import qnt.log as qnlog # log configuration
import qnt.optimizer as qno # optimizer
# display function for fancy displaying:
from IPython.display import display
# lib for charts
import plotly.graph_objs as go
data = qndata.futures_load_data(min_date='2005-01-01')
At first, let's start with a simple trend based strategy.
def strategy_long(data, asset=None, ma_period=150):
# filter by asset, we need it for optimization
if asset is not None:
data = data.sel(asset=[asset])
close = data.sel(field='close')
ma = qnta.lwma(close, ma_period)
ma_roc = qnta.roc(ma, 1)
# define signals
buy_signal = ma_roc > 0
stop_signal = ma_roc < 0
# rsi = qnta.rsi(close, rsi_period)
# buy_signal = np.logical_and(rsi < 30, ma_roc > 0)
# stop_signal = np.logical_or(rsi > 90, ma_roc < 0)
# transform signals to positions
position = xr.where(buy_signal, 1, np.nan)
position = xr.where(stop_signal, 0, position)
position = position.ffill('time').fillna(0)
# clean the output (not necessary)
# with qnlog.Settings(info=False,err=False): # suppress logging
# position = qnout.clean(position, data)
return position
Next, see the performance of the strategy
#DEBUG#
# evaluator will remove cells with such marks before evaluation
output = strategy_long(data)
stats = qnstats.calc_stat(data, output.sel(time=slice('2006-01-01',None)))
display(stats.to_pandas().tail())
Search for optimal parameters for all assets.
Let's try to optimize the strategy for all assets and see the performance.
#DEBUG#
# evaluator will remove cells with such marks before evaluation
result_for_all = qno.optimize_strategy(
data,
strategy_long,
qno.full_range_args_generator(ma_period=range(10, 200, 10)),
workers=1 # you can set more workers on your local PC to speed up
)
#DEBUG#
# evaluator will remove cells with such marks before evaluation
# chart
scatter = go.Scatter(
x=[i['args']['ma_period'] for i in result_for_all['iterations']],
y=[i['result']['sharpe_ratio'] for i in result_for_all['iterations']],
mode="markers",
name="optimization result",
marker_size=9,
marker_color='orange'
)
fig = go.Figure(data=scatter)
# fig.update_yaxes(fixedrange=False) # unlock vertical scrolling
fig.show()
print("---")
print("Best iteration:")
display(result_for_all['best_iteration'])
As you see, the result is still bad. That is why you need to optimize parameters per every asset.
Search for optimal parameters for each asset.
There is 1 parameter for this strategy ma_period
.
We will perform a full range scan. It will take about 11 minutes.
#DEBUG#
# evaluator will remove cells with such marks before evaluation
result_long = qno.optimize_strategy(
data,
strategy_long,
qno.full_range_args_generator(ma_period=range(10, 200, 10),
asset=data.asset.values.tolist()),
workers=1 # you can set more workers on your local PC to speed up
)
Observe the results:
###DEBUG###
# evaluator will remove cells with such marks before evaluation
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
assets_perf = [dict(
asset=asset,
sharpe=max(i['result']['sharpe_ratio'] for i in result_long['iterations'] if i['args']['asset'] == asset)
) for asset in data.asset.values.tolist()]
assets_perf.sort(key=lambda i: -i['sharpe'])
@interact(asset=[(a['asset'] + (" - %.2f" % a['sharpe']), a['asset']) for a in assets_perf])
def display_results(asset=assets_perf[0]['asset']):
asset_iterations = [i for i in result_long['iterations'] if i['args']['asset'] == asset]
scatter = go.Scatter(
x=[i['args']['ma_period'] for i in asset_iterations],
y=[i['result']['sharpe_ratio'] for i in asset_iterations],
mode="markers",
name="optimization result",
marker_size=9,
marker_color='orange'
),
fig = go.Figure(data=scatter)
# fig.update_yaxes(fixedrange=False) # unlock vertical scrolling
fig.show()
Select assets and optimal parameters
Now, you can select best parameters each asset. There is a big chance that you will overfit your strategy. So we will select multiple suitable paremeters for every asset.
We will select 15 good assets for the strategy. And we will select 3 best suitable parameters per selected asset. The more the better. It will be less likely that your strategy is overfitting.
#DEBUG#
# evaluator will remove cells with such marks before evaluation
def find_best_parameters(result, asset_count, parameter_set_count):
assets = data.asset.values.tolist()
assets.sort(key=lambda a: -asset_weight(result, a, parameter_set_count))
assets = assets[:asset_count]
params = []
for a in assets:
params += get_best_parameters_for_asset(result, a, parameter_set_count)
return params
def asset_weight(result, asset, parameter_set_count):
asset_iterations = [i for i in result['iterations'] if i['args']['asset'] == asset]
asset_iterations.sort(key=lambda i: -i['result']['sharpe_ratio'])
# weight is a sum of the three best iterations
return sum(i['result']['sharpe_ratio'] for i in asset_iterations[:parameter_set_count])
def get_best_parameters_for_asset(result, asset, parameter_set_count):
asset_iterations = [i for i in result['iterations'] if i['args']['asset'] == asset]
asset_iterations.sort(key=lambda i: -i['result']['sharpe_ratio'])
return [i['args'] for i in asset_iterations[:parameter_set_count]]
config = find_best_parameters(result=result_long, asset_count=15, parameter_set_count=3)
# If you change the asset_count and/or parameters_count, you will get a new strategy.
json.dump(config, open('config.json', 'w'), indent=2)
display(config)
We save config to the file and then load because the all cells with ##DEBUG###
will be removed.
config = json.load(open('config.json', 'r'))
Define the result strategy:
def optmized_strategy(data, config):
results = []
for c in config:
results.append(strategy_long(data, **c))
# align and join results
results = xr.align(*results, join='outer')
results = [r.fillna(0) for r in results]
output = sum(results) / len(results)
return output
Result
Let's see the performance of the optimized strategy.
output = optmized_strategy(data, config)
output = qnout.clean(output, data) # fix common issues
qnout.check(output, data)
qnout.write(output)
stats = qnstats.calc_stat(data, output.sel(time=slice('2006-01-01',None)))
display(stats.to_pandas().tail())
qngraph.make_major_plots(stats)
The performance is good. Finally, write the output.
We recommend to do research and optimization in a separate notebook. Leave only final code in
strategy.ipynb
.
The final code for the strategy with multi-pass backtester(looking forward test):
import json
import xarray as xr
import xarray.ufuncs as xruf
import numpy as np
import pandas as pd
import qnt.data as qndata
import qnt.stats as qnstats
import qnt.graph as qngraph
import qnt.ta as qnta
import qnt.output as qnout
import qnt.log as qnlog
import qnt.optimizer as qno
import qnt.backtester as qnbk
def strategy_long(data, asset=None, ma_period=150):
# filter by asset, we will need it for further optimization
if asset is not None:
data = data.sel(asset=[asset])
close = data.sel(field='close')
ma = qnta.lwma(close, ma_period)
ma_roc = qnta.roc(ma, 1)
# define signal
buy_signal = ma_roc > 0
buy_stop_signal = ma_roc < 0
# transform signals to positions
position = xr.where(buy_signal, 1, np.nan)
position = xr.where(buy_stop_signal, 0, position)
position = position.ffill('time').fillna(0)
# clean the output (not necessary)
# with qnlog.Settings(info=False,err=False): # suppress logging
# position = qnout.clean(position, data)
return position
def optmized_strategy(data, config):
results = []
for c in config:
results.append(strategy_long(data, **c))
# align and join results
results = xr.align(*results, join='outer')
results = [r.fillna(0) for r in results]
output = sum(results) / len(results)
return output
config = json.load(open('config.json', 'r'))
# multi-pass
# It may look slow, but it is ok. The evaluator will run only one iteration per day.
qnbk.backtest(
competition_type='futures',
lookback_period=365,
strategy=lambda d: optmized_strategy(d, config),
# strategy=strategy_long, # you can check the base strategy too
start_date='2006-01-01'
)
# # single-pass
# data = qndata.futures_load_data(min_date='2005-01-01')
# output = optmized_strategy(data, config)
# output = qnout.clean(output, data)
# stats = qnstats.calc_stat(data, output.sel(time=slice('2006-01-01',None)))
# display(stats.to_pandas().tail())
# qngraph.make_major_plots(stats)
# qnout.check(output, data)
# qnout.write(output)
P.S.
In practice, it makes sense to split the data into the "train" set and the "test" set.
The train set may contain all data excepting the 1-3 last years. The test set will contain these 1-3 last years.
To check the real performance of your strategy and detect the overfitting, optimize parameters using the train set and test you strategy using the "test" set. It will give you a clue.
Before the submission, run your optimizer using all available data.