"""Machine learning based optimiser for the strategy parameters.
- Users scikit-optimize library
- Similar as grid seacrh
- Instead of searching all parameter combinations, search only some based on an algorithm
"""
import dataclasses
import datetime
import inspect
import logging
import os
import typing
import warnings
from _decimal import Decimal
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Type
import psutil
from joblib import Parallel, delayed
from skopt import space, Optimizer
from skopt.space import Dimension
from tqdm_loggable.auto import tqdm
from tradeexecutor.backtest.grid_search import GridCombination, GridSearchDataRetention, GridSearchResult, save_disk_multiprocess_strategy_universe, initialise_multiprocess_strategy_universe_from_disk, run_grid_search_backtest, \
get_grid_search_result_path, GridParameter
from tradeexecutor.cli.log import setup_notebook_logging
from tradeexecutor.strategy.engine_version import TradingStrategyEngineVersion
from tradeexecutor.strategy.execution_context import ExecutionContext, grid_search_execution_context, scikit_optimizer_context
from tradeexecutor.strategy.pandas_trader.indicator import DiskIndicatorStorage, CreateIndicatorsProtocol, IndicatorStorage
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.strategy_module import DecideTradesProtocol4
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradeexecutor.utils.cpu import get_safe_max_workers_count
logger = logging.getLogger(__name__)
[docs]@dataclass(slots=True)
class OptimiserSearchResult:
"""Single optimiser search result value.
This is used in different contextes
- Return value from :py:class:`SearchFunction`
- Passing data from child worker to the parent process
- Passing data from :py:func:`perform_optimisation` to the notebook
"""
#: The raw value of the search function we are optimising
value: float
#: Did we flip this value to negative because we are looking for a minimised result
negative: bool
#: For which grid combination this result was
#:
#: This is the key to load the child worker produced data from the disk in the parent process
#:
combination: GridCombination | None = None
# We do not pass the result directly to child process but let the parent process to read it from the disk
#:
#: Call :py:meth:`hydrate` to make this data available.
#:
result: GridSearchResult | None = None
#: Did we filter out this result
#:
#: See `result_filter` in :py:func:`perform_optimisation`
#:
filtered: bool = False
#: How long it took to run this iteration
#:
#: Filled by main thread
iteration_duration: datetime.timedelta | None = None
#: Iteration id
#:
#: Filled by main thread
iteration: int = None
def __repr__(self):
return f"<OptimiserSearchResult {self.combination} = {self.get_original_value()}>"
# Allow to call min(list[OptimiserSearchResult]) to find the best value
def __eq__(self, other):
return self.value == other.value
def __lt__(self, other):
return self.value < other.value
def __gt__(self, other):
return self.value > other.value
[docs] def get_original_value(self) -> float:
"""Get the original best search value.
- Flip off the extra minus sign if we had to add it
"""
if self.negative:
return -self.value
else:
return self.value
[docs] def hydrate(self):
"""Load the grid search result data for this result from the disk."""
self.result = GridSearchResult.load(self.combination)
[docs] def is_valid(self) -> bool:
"""Could not calculate a value."""
return self.value is not None
[docs] def get_metrics_persistent_size(self) -> int:
"""Get the size of stored metrics."""
path = self.combination.get_metrics_pickle_path()
assert path.exists(), f"GridSearchResult {path} does not exist"
return path.stat().st_size
[docs] def get_state_size(self) -> int:
"""Get the size of stored state."""
path = self.combination.get_compressed_state_file_path()
assert path.exists(), f"GridSearchResult {path} does not exist"
return path.stat().st_size
[docs]class SearchFunction(typing.Protocol):
"""The function definition for the optimiser search function.
- The search function extracts the result variable we want to optimise
for from the :py:class:`GridSearchResult`
Example:
.. code-block:: python
# Search for the best CAGR value.
def optimise_profit(result: GridSearchResult) -> OptimiserSearchResult:
return OptimiserSearchResult(-result.get_cagr(), negative=True)
"""
[docs] def __call__(self, result: GridSearchResult) -> OptimiserSearchResult:
"""The search function extracts the parm
:param result:
The latest backtest result as grid search result object.
:return:
Value of the function to optimise.
Smaller is better. E.g. if you want to optimise profit, return negative profit.
"""
[docs]class ResultFilter(typing.Protocol):
"""Apply to drop bad optimiser result.
- E.g. by :py:class:`MinTradeCountFilter`
"""
[docs] def __call__(self, result: GridSearchResult) -> bool:
"""Return true if the optimiser result satifies our filter criteria."""
pass
[docs]@dataclass(frozen=True, slots=True)
class OptimiserResult:
"""The outcome of the optimiser run.
- Contains all the grid search results we generated during the run
"""
#: The parameters we searched
#:
#: Both fixed and search space.
#:
parameters: StrategyParameters
# Where we store the grid search results data
result_path: Path
#: Different grid search results
#:
#: Sortd from the best to the worst.
#:
results: list[OptimiserSearchResult]
#: Where did we store precalculated indicator files.
#:
#: Allows to peek into raw indicator data if we need to.
indicator_storage: DiskIndicatorStorage
[docs] def is_empty(self) -> bool:
"""The optimiser run generated zero results."""
return len(self.results) == 0
[docs] @staticmethod
def determine_result_path(result_path: Path | None, parameters: StrategyParameters) -> Path:
"""Generate result path or use the exiting."""
if result_path:
assert isinstance(result_path, Path)
return result_path
name_hint = parameters.get("id") or parameters.get("name")
assert name_hint, f"Cannot determine parameter id or name for StrategyParameters, needed for optimiser search result storage: {parameters}"
return get_grid_search_result_path(name_hint)
[docs] def get_combination_count(self) -> int:
"""How many combinations we searched in this optimiser run."""
return len(self.results)
[docs] def get_results_as_grid_search_results(self) -> list[GridSearchResult]:
"""Get all search results as grid search results list for the analysis.
- Any results that are marked as filtered away are not returned
"""
return [r.result for r in self.results if not r.filtered]
[docs] def get_cached_count(self) -> int:
"""How many of the results were directly read from the disk and not calculated on this run."""
return len([r for r in self.results if r.result.cached])
[docs] def get_failed_count(self) -> int:
"""How many backtest runs failed with an exception."""
return len([r for r in self.results if r.result.exception is not None])
[docs] def get_filtered_count(self) -> int:
"""How many of the results were filtered out by result filter.
See :py:func:`perform_optimisation`
"""
return len([r for r in self.results if r.filtered])
[docs]class ObjectiveWrapper:
"""Middleware between Optimiser and TS frameworks.
- Passes the fixed data to the child workers
"""
[docs] def __init__(
self,
pickled_universe_fname: str,
search_func: SearchFunction,
parameters: StrategyParameters,
trading_strategy_engine_version: str,
decide_trades: DecideTradesProtocol4,
create_indicators: CreateIndicatorsProtocol,
indicator_storage: IndicatorStorage,
result_path: Path,
search_space: list[Dimension],
real_space_rounding: Decimal,
log_level: int,
result_filter: ResultFilter,
draw_visualisation: bool,
ignore_wallet_errors: bool,
):
self.search_func = search_func
self.pickled_universe_fname = pickled_universe_fname
self.parameters = parameters
self.decide_trades = decide_trades
self.create_indicators = create_indicators
self.indicator_storage = indicator_storage
self.trading_strategy_engine_version = trading_strategy_engine_version
self.result_path = result_path
self.search_space = search_space
self.real_space_rounding = real_space_rounding
self.log_level = log_level
self.result_filter = result_filter
self.filtered_result_value = 0
self.draw_visualisation = draw_visualisation
self.ignore_wallet_errors = ignore_wallet_errors
[docs] def __call__(
self,
result_index: int,
args: list[str | int | float],
):
"""This function is at the entry point of a child worker process.
- Sets up a backtest within a child process,
so that the results are stored as :py:class:`GridSearchResult`
:param args:
The current search space values from the optimiser.
"""
assert type(result_index) == int
assert type(args) == list, f"Expected list of args, got {args}"
if self.log_level:
setup_notebook_logging(self.log_level, show_process=True)
logger.info("Starting optimiser batch %d in child worker %d", result_index, os.getpid())
strategy_universe = initialise_multiprocess_strategy_universe_from_disk(self.pickled_universe_fname)
combination = create_grid_combination(
self.result_path,
self.search_space,
result_index,
args,
self.real_space_rounding,
)
if GridSearchResult.has_result(combination):
# We have run this search point before and can load from the cache
result = GridSearchResult.load(combination)
else:
# We are running this search point for the first time
# Merge the current search values with the fixed parameter
merged_parameters = StrategyParameters.from_dict(self.parameters)
merged_parameters.update({p.name: p.get_computable_value() for p in combination.parameters})
# Make sure we drag the engine version along
execution_context = dataclasses.replace(scikit_optimizer_context)
execution_context.engine_version = self.trading_strategy_engine_version
execution_context.force_visualisation = self.draw_visualisation
result = run_grid_search_backtest(
combination,
decide_trades=self.decide_trades,
universe=strategy_universe,
create_indicators=self.create_indicators,
parameters=merged_parameters,
indicator_storage=self.indicator_storage,
trading_strategy_engine_version=self.trading_strategy_engine_version,
execution_context=execution_context,
max_workers=1, # Don't allow this child process to create its own worker pool for indicator calculations
initial_deposit=merged_parameters["initial_cash"],
ignore_wallet_errors=self.ignore_wallet_errors,
)
result.save(include_state=True)
if getattr(result, "exception", None) is None: # Legacy pickle compat
opt_result = self.search_func(result)
# Apply result filter and zero out the value for optimiser if needed
if not self.result_filter(result):
opt_result.value = self.filtered_result_value # Zero out the actual value so optimiser knows this is a bad path
opt_result.filtered = True # Tell the caller that this is a filtered result and should not be displayed by default
else:
# The backtest crashed with an exception,
# likely OutOfBalance
opt_result = OptimiserSearchResult(self.filtered_result_value, negative=False)
opt_result.combination = combination
logger.info("Optimiser for combination %s resulted to %s, exception is %s, exiting child process", combination, opt_result.value, result.exception)
opt_result.result = None # Main thread needs to dehydrate
return opt_result
[docs]def get_optimised_dimensions(parameters: StrategyParameters) -> list[space.Dimension]:
"""Get all dimensions we are going to search."""
return [p for p in parameters.values() if isinstance(p, space.Dimension)]
[docs]def prepare_optimiser_parameters(
param_class: Type,
) -> StrategyParameters:
"""Optimised parameters must be expressed using scikit-optimise internals.
- The parameters class must contain at least one parameter that is subclass of :py:class:`Space`
- Unlike in grid search, `Space` parameters are passed as is, and not preprocessed
for the number of combinations, as we use iterator count to search through the search space
- Assign a name to each dimension
:param param_class:
Parameters class in your notebook
param warn_float:
Warn about unbounded float in the parameters, as it will hurt the performance, because we cannot cache results for arbitrary values.
:return:
Validated parameters.
"""
assert inspect.isclass(param_class), f"Expected class, got {type(param_class)}"
parameters = StrategyParameters.from_class(param_class)
assert any([isinstance(v, space.Dimension) for v in parameters.values()]), f"Not a single scikit-optimiser Dimemsion instance detected in {parameters}"
# Assign names to the dimensions
for key, value in parameters.iterate_parameters():
if isinstance(value, space.Dimension):
value.name = key
return parameters
[docs]def create_grid_combination(
result_path: Path,
search_space: list[Dimension],
index: int,
search_args: list[str | float | int],
real_space_rounding: Decimal,
) -> GridCombination:
"""Turn scikit-optimise search arguments to a grid combination.
GridCombination allows us to store the results on the disk
and match the data for the later analysis and caching.
"""
# Convert scikit-optimise determined search space parameters
# for this round to GridParameters, which are used as a cache key
assert type(search_space) == list
assert type(search_args) == list
assert len(search_space) == len(search_args), f"Got {len(search_space)}, {len(search_args)}"
parameters = []
for dim, val in zip(search_space, search_args):
# Round real numbers in the search space
# to some manageable values we can use in filenames
if isinstance(dim, space.Real):
val = Decimal(val).quantize(real_space_rounding)
assert dim.name, f"Dimension unnamed: {dim}. Did you call prepare_optimiser_parameters()? Do not call StrategyParameters.from_class()."
p = GridParameter(name=dim.name, value=val, single=True, optimise=True)
parameters.append(p)
combination = GridCombination(
index=index,
result_path=result_path,
parameters=tuple(parameters),
)
return combination
[docs]class MinTradeCountFilter:
"""Have a minimum threshold of a created trading position count.
Avoid strategies that have a single or few random successful open and close.
"""
[docs] def __init__(self, min_trade_count):
self.min_trade_count = min_trade_count
[docs] def __call__(self, result: GridSearchResult) -> bool:
"""Return true if the trade count threshold is reached."""
return result.get_trade_count() > self.min_trade_count