"""Indicator definitions."""
import datetime
import threading
from abc import ABC, abstractmethod
# Enable pickle patch that allows multiprocessing in notebooks
from tradeexecutor.monkeypatch import cloudpickle_patch
import concurrent
import enum
import inspect
import itertools
import os
import pickle
import shutil
import signal
import sys
import tempfile
from collections.abc import Iterable
from dataclasses import dataclass
from multiprocessing import Process
from pathlib import Path
from types import NoneType
from typing import Callable, Protocol, Any, TypeAlias
import logging
import futureproof
import pandas as pd
from tqdm_loggable.auto import tqdm
from tradeexecutor.state.identifier import TradingPairIdentifier
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, UniverseCacheKey
from tradeexecutor.utils.cpu import get_safe_max_workers_count
logger = logging.getLogger(__name__)
#: Where do we keep precalculated indicator Parquet files
#:
DEFAULT_INDICATOR_STORAGE_PATH = Path(os.path.expanduser("~/.cache/indicators"))
class IndicatorCalculationFailed(Exception):
"""We could not calculate the given indicator.
- Wrap the underlying Python exception to a friendlier error message
"""
class IndicatorFunctionSignatureMismatch(Exception):
"""Given Pythohn function cannot run on the passed parameters."""
[docs]class IndicatorSource(enum.Enum):
"""The data on which the indicator will be calculated."""
#: Calculate this indicator based on candle close price
#:
#: Example indicators
#:
#: - RSI
#: - Moving overage
#:
close_price = "close_price"
#: Calculate this indicator based on candle open price
#:
#: Not used commonly
#:
open_price = "open_price"
#: Calculate this indicator based on multipe data points (open, high, low, close, volume)
#:
#: Example indicators
#:
#: - Money flow index (MFI) reads close, high, low columns
#:
#: The indicator function can take arguments named: open, high, low, close, volume
#: which all are Pandas US dollar series. If parameters are not present they are discarded.
#:
ohlcv = "ohlcv"
#: This indicator is calculated once per the strategy universe
#:
#: These indicators are custom and do not have trading pair set
#:
strategy_universe = "strategy_universe"
[docs] def is_per_pair(self) -> bool:
"""This indicator is calculated to all trading pairs."""
return self in (IndicatorSource.open_price, IndicatorSource.close_price, IndicatorSource.ohlcv)
def _flatten_index(series: pd.Series) -> pd.Series:
"""Ensure that any per-pair series we have has DatetimeIndex, not MultiIndex."""
if isinstance(series.index, pd.DatetimeIndex):
return series
if isinstance(series.index, pd.MultiIndex):
new_index = series.index.get_level_values(1) # assume pair id, timestamp tuples
assert isinstance(new_index, pd.DatetimeIndex)
series_2 = series.copy()
series_2.index = new_index
return series_2
else:
raise NotImplementedError(f"Unknown index: {series.index}")
[docs]@dataclass(slots=True)
class IndicatorDefinition:
"""A definition for a single indicator.
- Indicator definitions are static - they do not change between the strategy runs
- Used as id for the caching the indicator results
- Definitions are used to calculate indicators for all trading pairs,
or once over the whole trading universe
- Indicators are calcualted independently from each other -
a calculation cannot access cached values of other calculation
"""
#: Name of this indicator.
#:
#: Later in `decide_trades()` you use this name to access the indicator data.
#:
name: str
#: The underlying method we use to
#:
#: Same function can part of multiple indicators with different parameters (length).
#:
#: Because function pickling issues, this may be set to ``None`` in results.
#:
func: Callable | None
#: Parameters for building this indicator.
#:
#: - Each key is a function argument name for :py:attr:`func`.
#: - Each value is a single value
#:
#: - Grid search multiple parameter ranges are handled outside indicator definition
#:
parameters: dict
#: On what trading universe data this indicator is calculated
#:
source: IndicatorSource = IndicatorSource.close_price
def __repr__(self):
return f"<Indicator {self.name} using {self.func.__name__ if self.func else '?()'} for {self.parameters}>"
def __eq__(self, other):
return self.name == other.name and self.parameters == other.parameters and self.source == other.source
def __hash__(self):
# https://stackoverflow.com/a/5884123/315168
try:
return hash((self.name, frozenset(self.parameters.items()), self.source))
except Exception as e:
raise (f"Could not hash {self}. If changing grid search to backtest, remember to change lists to single value. Exception is {e}")
def __post_init__(self):
assert type(self.name) == str
assert type(self.parameters) == dict
if self.func is not None:
assert callable(self.func)
validate_function_kwargs(self.func, self.parameters)
[docs] def is_needed_for_pair(self, pair: TradingPairIdentifier) -> bool:
"""Currently indicators are calculated for spont pairs only."""
return pair.is_spot()
def is_per_pair(self) -> bool:
return self.source.is_per_pair()
[docs] def calculate_by_pair(self, input: pd.Series) -> pd.DataFrame | pd.Series:
"""Calculate the underlying indicator value.
:param input:
Price series used as input.
:return:
Single or multi series data.
- Multi-value indicators return DataFrame with multiple columns (BB).
- Single-value indicators return Series (RSI, SMA).
"""
try:
input_fixed = _flatten_index(input)
ret = self.func(input_fixed, **self.parameters)
return self._check_good_return_value(ret)
except Exception as e:
raise IndicatorCalculationFailed(f"Could not calculate indicator {self.name} ({self.func}) for parameters {self.parameters}, input data is {len(input)} rows") from e
[docs] def calculate_by_pair_ohlcv(self, candles: pd.DataFrame) -> pd.DataFrame | pd.Series:
"""Calculate the underlying OHCLV indicator value.
Assume function can take parameters: `open`, `high`, `low`, `close`, `volume`,
or any combination of those.
:param input:
Raw OHCLV candles data.
:return:
Single or multi series data.
- Multi-value indicators return DataFrame with multiple columns (BB).
- Single-value indicators return Series (RSI, SMA).
"""
assert isinstance(candles, pd.DataFrame), f"OHLCV-based indicator function must be fed with a DataFrame"
input_fixed = _flatten_index(candles)
needed_args = ("open", "high", "low", "close", "volume")
full_kwargs = {}
func_args = inspect.getfullargspec(self.func).args
for a in needed_args:
if a in func_args:
full_kwargs[a] = input_fixed[a]
if len(full_kwargs) == 0:
raise IndicatorCalculationFailed(f"Could not calculate OHLCV indicator {self.name} ({self.func}): does not take any of function arguments from {needed_args}")
full_kwargs.update(self.parameters)
try:
ret = self.func(**full_kwargs)
return self._check_good_return_value(ret)
except Exception as e:
raise IndicatorCalculationFailed(f"Could not calculate indicator {self.name} ({self.func}) for parameters {self.parameters}, candles is {len(candles)} rows, {candles.columns} columns") from e
[docs] def calculate_universe(self, input: TradingStrategyUniverse) -> pd.DataFrame | pd.Series:
"""Calculate the underlying indicator value.
:param input:
Price series used as input.
:return:
Single or multi series data.
- Multi-value indicators return DataFrame with multiple columns (BB).
- Single-value indicators return Series (RSI, SMA).
"""
try:
ret = self.func(input, **self.parameters)
return self._check_good_return_value(ret)
except Exception as e:
raise IndicatorCalculationFailed(f"Could not calculate indicator {self.name} ({self.func}) for parameters {self.parameters}, input universe is {input}.\nException is {e}\n\n To use Python debugger, set `max_workers=1`, and if doing a grid search, also set `multiprocess=False`") from e
def _check_good_return_value(self, df):
assert isinstance(df, (pd.Series, pd.DataFrame)), f"Indicator did not return pd.DataFrame or pd.Series: {self.name}, we got {type(df)}"
return df
[docs]@dataclass(slots=True, frozen=True)
class IndicatorKey:
"""Cache key used to read indicator results.
- Used to describe all indicator combinations we need to create
- Used as the key in the indicator result caching
"""
#: Trading pair if this indicator is specific to a pair
#:
#: Note if this indicator is for the whole strategy
#:
pair: TradingPairIdentifier | None
#: The definition of this indicator
definition: IndicatorDefinition
def __post_init__(self):
assert isinstance(self.pair, (TradingPairIdentifier, NoneType))
assert isinstance(self.definition, IndicatorDefinition)
def __repr__(self):
return f"<IndicatorKey {self.get_cache_key()}>"
def get_cache_id(self) -> str:
if self.pair is not None:
return self.pair.get_ticker()
else:
# Indicator calculated over the universe
assert self.definition.source == IndicatorSource.strategy_universe
return "universe"
def __eq__(self, other):
return self.pair == other.pair and self.definition == other.definition
def __hash__(self):
return hash((self.pair, self.definition))
def get_cache_key(self) -> str:
if self.pair:
slug = self.pair.get_ticker()
else:
slug = "universe"
def norm_value(v):
if isinstance(v, enum.Enum):
v = str(v.value)
else:
v = str(v)
return v
parameters = ",".join([f"{k}={norm_value(v)}" for k, v in self.definition.parameters.items()])
return f"{self.definition.name}({parameters})-{slug}"
[docs]class IndicatorSet:
"""Define the indicators that are needed by a trading strategy.
- For backtesting, indicators are precalculated
- For live trading, these indicators are recalculated for the each decision cycle
- Indicators are calculated for each given trading pair, unless specified otherwise
See :py:class:`CreateIndicatorsProtocolV2` for usage.
"""
[docs] def __init__(self):
#: Map indicators by the indicator name to their definition
self.indicators: dict[str, IndicatorDefinition] = {}
def has_indicator(self, name: str) -> bool:
return name in self.indicators
def get_label(self):
if len(self.indicators) == 0:
return "<zero indicators defined>"
return ", ".join(k for k in self.indicators.keys())
[docs] def get_count(self) -> int:
"""How many indicators we have"""
return len(self.indicators)
[docs] def get_indicator(self, name: str) -> IndicatorDefinition | None:
"""Get a named indicator definition."""
return self.indicators.get(name)
[docs] def add(
self,
name: str,
func: Callable,
parameters: dict | None = None,
source: IndicatorSource=IndicatorSource.close_price,
):
"""Add a new indicator to this indicator set.
Builds an indicator set for the trading strategy,
called from `create_indicators`.
See :py:class:`CreateIndicatorsProtocol` for usage.
:param name:
Name of the indicator.
Human-readable name. If the same function is calculated multiple times, e.g. EMA,
you can have names like `ema_short` and `ema_long`.
:param func:
Python function to be called.
Function takes arguments from `parameters` dict.
It must return either :py:class:`pd.DataFrame` or :py:class:`pd.Series`.
:param parameters:
Parameters to be passed to the Python function.
Raw `func` Python arguments.
You can pass parameters as is from `StrategyParameters`.
:param source:
Data source on this indicator is calculated.
Defaults to the close price for each trading pair.
To calculate universal indicators set to :py:attr:`IndicatorSource.strategy_universe`.
"""
assert type(name) == str
assert callable(func), f"{func} is not callable"
if parameters is None:
parameters = {}
assert type(parameters) == dict, f"parameters must be dictionary, we got {parameters.__class__}"
assert isinstance(source, IndicatorSource), f"Expected IndicatorSource, got {type(source)}"
assert name not in self.indicators, f"Indicator {name} already added"
self.indicators[name] = IndicatorDefinition(name, func, parameters, source)
def iterate(self) -> Iterable[IndicatorDefinition]:
yield from self.indicators.values()
[docs] def generate_combinations(self, strategy_universe: TradingStrategyUniverse) -> Iterable[IndicatorKey]:
"""Create all indiviual indicator (per pair) we need to calculate for this trading universe."""
for name, indicator in self.indicators.items():
if indicator.is_per_pair():
for pair in strategy_universe.iterate_pairs():
yield IndicatorKey(pair, indicator)
else:
yield IndicatorKey(None, indicator)
[docs] @staticmethod
def from_indicator_keys(indicator_keys: set["IndicatorKey"]) -> "IndicatorSet":
"""Reconstruct the original indicator set from keys.
- Used when grid search passes data around processes
"""
indicator_set = IndicatorSet()
indicator_set.indicators = {key.definition.name: key.definition for key in indicator_keys}
return indicator_set
[docs]class CreateIndicatorsProtocolV1(Protocol):
"""Call signature for create_indicators function.
Deprecated. See :py:class:`CreateIndicatorsProtocolV2`.
"""
[docs] def __call__(
self,
parameters: StrategyParameters,
indicators: IndicatorSet,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
):
"""Build technical indicators for the strategy.
:param parameters:
Passed from the backtest / live strategy parametrs.
If doing a grid search, each paramter is simplified.
:param indicators:
Indicator builder helper class.
Call :py:meth:`IndicatorBuilder.create` to add new indicators to the strategy.
:param strategy_universe:
The loaded strategy universe.
Use to resolve symbolic pair information if needed
:param execution_context:
Information about if this is a live or backtest run.
:return:
This function does not return anything.
Instead `indicators.add` is used to attach new indicators to the strategy.
"""
[docs]class CreateIndicatorsProtocolV2(Protocol):
"""Call signature for create_indicators function.
This Protocol class defines `create_indicators()` function call signature.
Strategy modules and backtests can provide on `create_indicators` function
to define what indicators a strategy needs. These indicators are precalculated and cached for fast performance.
- There are multiple indicator types, depending on if they are calculated on pair close price,
pair OHLCV data or the whole strategy universe. See :py:class:`IndicatorSource`.
- Uses :py:class`IndicatorSet` class to construct the indicators the strategy can use.
- To read indicator values in `decide_trades()` function,
see :py:class:`~tradeexecutor.strategy.strategy_input.StrategyInputIndicators`.
- For most :py:mod:`pandas_ta` functions. like `pandas_ta.ma`, `pandas_ta.rsi`, `pandas_ta.mfi`, you can pass them directly to
`indicators.add()` - as those functions have standard argument names like `close`, `high`, `low` that
are data series provided.
Example for creating an Exponential Moving Average (EMA) indicator based on the `close` price.
This example is for a grid search. Unless specified, indicators are assumed to be
:py:attr:`IndicatorSource.close_price` type and they only use trading pair close price as input.
.. code-block:: python
class Parameters:
stop_loss_pct = [0.9, 0.95]
cycle_duration = CycleDuration.cycle_1d
initial_cash = 10_000
# Indicator values that are searched in the grid search
slow_ema_candle_count = 7
fast_ema_candle_count = [1, 2]
def create_indicators(
timestamp: datetime.datetime | None,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext
):
indicators = IndicatorSet()
indicators.add("slow_ema", pandas_ta.ema, {"length": parameters.slow_ema_candle_count})
indicators.add("fast_ema", pandas_ta.ema, {"length": parameters.fast_ema_candle_count})
return indicators
Some indicators may use multiple OHLCV datapoints. In this case, you need to tell the indicator to be :py:attr:`IndicatorSource.ohlcv` type.
Here is an example for Money Flow Index (MFI) indicator:
.. code-block:: python
import pandas_ta
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
class Parameters:
my_mfi_length = 20
def create_indicators(
timestamp: datetime.datetime | None,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext
):
indicators = IndicatorSet()
indicators.add(
"mfi",
pandas_ta.mfi,
parameters={"length": parameters.my_mfi_length},
source=IndicatorSource.ohlcv,
)
Indicators can be custom, and do not need to be calculated per trading pair.
Here is an example of creating indicators "ETH/BTC price" and "ETC/BTC price RSI with length of 20 bars":
.. code-block:: python
def calculate_eth_btc(strategy_universe: TradingStrategyUniverse):
weth_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WETH", "USDC"))
wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC"))
btc_price = strategy_universe.data_universe.candles.get_candles_by_pair(wbtc_usdc.internal_id)
eth_price = strategy_universe.data_universe.candles.get_candles_by_pair(weth_usdc.internal_id)
series = eth_price["close"] / btc_price["close"] # Divide two series
return series
def calculate_eth_btc_rsi(strategy_universe: TradingStrategyUniverse, length: int):
weth_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WETH", "USDC"))
wbtc_usdc = strategy_universe.get_pair_by_human_description((ChainId.ethereum, "test-dex", "WBTC", "USDC"))
btc_price = strategy_universe.data_universe.candles.get_candles_by_pair(wbtc_usdc.internal_id)
eth_price = strategy_universe.data_universe.candles.get_candles_by_pair(weth_usdc.internal_id)
eth_btc = eth_price["close"] / btc_price["close"]
return pandas_ta.rsi(eth_btc, length=length)
def create_indicators(parameters: StrategyParameters, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext) -> IndicatorSet:
indicators = IndicatorSet()
indicators.add("eth_btc", calculate_eth_btc, source=IndicatorSource.strategy_universe)
indicators.add("eth_btc_rsi", calculate_eth_btc_rsi, parameters={"length": parameters.eth_btc_rsi_length}, source=IndicatorSource.strategy_universe)
return indicators
This protocol class is second (v2) iteration of the function signature.
"""
[docs] def __call__(
self,
timestamp: datetime.datetime | None,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
) -> IndicatorSet:
"""Build technical indicators for the strategy.
:param timestamp:
The current live execution timestamp.
Set ``None`` for backtesting, as `create_indicators()` is called only once during the backtest setup.
:param parameters:
Passed from the backtest / live strategy parametrs.
If doing a grid search, each paramter is simplified.
:param strategy_universe:
The loaded strategy universe.
Use to resolve symbolic pair information if needed
:param execution_context:
Information about if this is a live or backtest run.
:return:
Indicators the strategy is going to need.
"""
#: Use this in function singatures
CreateIndicatorsProtocol: TypeAlias = CreateIndicatorsProtocolV1 | CreateIndicatorsProtocolV2
[docs]def call_create_indicators(
create_indicators_func: Callable,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
timestamp: datetime.datetime = None,
) -> IndicatorSet:
"""Backwards compatible wrapper for create_indicators().
- Check `create_indicators_func` version
- Handle legacy / backwards compat
"""
assert callable(create_indicators_func)
args = inspect.getfullargspec(create_indicators_func)
if "indicators" in args.args:
# v1 backwards
indicators = IndicatorSet()
create_indicators_func(parameters, indicators, strategy_universe, execution_context)
return indicators
# v2
return create_indicators_func(timestamp, parameters, strategy_universe, execution_context)
[docs]@dataclass
class IndicatorResult:
"""One result of an indicator calculation we can store on a disk.
- Allows storing and reading output of a single precalculated indicator
- Parameters is a single combination of parameters
"""
#: The universe for which we calculated the result
#:
#:
universe_key: UniverseCacheKey
#: The pair for which this result was calculated
#:
#: Set to ``None`` for indicators without a trading pair, using
#: :py:attr:`IndicatorSource.strategy_universe`
#:
indicator_key: IndicatorKey
#: Indicator output is one time series, but in some cases can be multiple as well.
#:
#: For example BB indicator calculates multiple series from one close price value.
#:
#:
data: pd.DataFrame | pd.Series
#: Was this indicator result cached or calculated on this run.
#:
#: Always cached in a grid search, as indicators are precalculated.
#:
cached: bool
@property
def pair(self) -> TradingPairIdentifier:
return self.indicator_key.pair
@property
def definition(self) -> IndicatorDefinition:
return self.indicator_key.definition
IndicatorResultMap: TypeAlias = dict[IndicatorKey, IndicatorResult]
[docs]class IndicatorStorage(ABC):
"""Base class for cached indicators and live trading indicators."""
@abstractmethod
def is_available(self, key: IndicatorKey) -> bool:
pass
@abstractmethod
def load(self, key: IndicatorKey) -> IndicatorResult:
pass
@abstractmethod
def save(self, key: IndicatorKey, df: pd.DataFrame | pd.Series) -> IndicatorResult:
pass
@abstractmethod
def get_disk_cache_path(self) -> Path | None:
pass
@abstractmethod
def get_universe_cache_path(self) -> Path | None:
pass
[docs]class DiskIndicatorStorage(IndicatorStorage):
"""Store calculated indicator results on disk.
Used in
- Backtesting
- Grid seacrh
Indicators are calculated once and the calculation results can be recycled across multiple backtest runs.
TODO: Cannot handle multichain universes at the moment, as serialises trading pairs by their ticker.
"""
[docs] def __init__(self, path: Path, universe_key: UniverseCacheKey):
assert isinstance(path, Path)
assert type(universe_key) == str
self.path = path
self.universe_key = universe_key
def __repr__(self):
return f"<IndicatorStorage at {self.path}>"
def get_universe_cache_path(self) -> Path:
return self.path / Path(self.universe_key)
def get_disk_cache_path(self) -> Path:
return self.path
[docs] def get_indicator_path(self, key: IndicatorKey) -> Path:
"""Get the Parquet file where the indicator data is stored.
:return:
Example `/tmp/.../test_indicators_single_backtes0/ethereum,1d,WETH-USDC-WBTC-USDC,2021-06-01-2021-12-31/sma(length=21).parquet`
"""
assert isinstance(key, IndicatorKey)
return self.get_universe_cache_path() / Path(f"{key.get_cache_key()}.parquet")
def is_available(self, key: IndicatorKey) -> bool:
return self.get_indicator_path(key).exists()
[docs] def load(self, key: IndicatorKey) -> IndicatorResult:
"""Load cached indicator data from the disk."""
assert self.is_available(key), f"Data does not exist: {key}"
path = self.get_indicator_path(key)
df = pd.read_parquet(path)
if len(df.columns) == 1:
# Convert back to series
df = df[df.columns[0]]
return IndicatorResult(
self.universe_key,
key,
df,
cached=True,
)
[docs] def save(self, key: IndicatorKey, df: pd.DataFrame | pd.Series) -> IndicatorResult:
"""Atomic replacement of the existing data.
- Avoid leaving partially written files
"""
assert isinstance(key, IndicatorKey)
if isinstance(df, pd.Series):
# For saving, create a DataFrame with a single column "value"
save_df = pd.DataFrame({"value": df})
else:
save_df = df
assert isinstance(save_df, pd.DataFrame), f"Expected DataFrame, got: {type(df)}"
path = self.get_indicator_path(key)
dirname, basename = os.path.split(path)
os.makedirs(dirname, exist_ok=True)
temp = tempfile.NamedTemporaryFile(mode='wb', delete=False, dir=dirname)
save_df.to_parquet(temp)
temp.close()
# https://stackoverflow.com/a/3716361/315168
shutil.move(temp.name, path)
logger.info("Saved %s", path)
return IndicatorResult(
universe_key=self.universe_key,
indicator_key=key,
data=df,
cached=False,
)
[docs] @staticmethod
def create_default(
universe: TradingStrategyUniverse,
default_path=DEFAULT_INDICATOR_STORAGE_PATH,
) -> "DiskIndicatorStorage":
"""Get the indicator storage with the default cache path."""
return DiskIndicatorStorage(default_path, universe.get_cache_key())
[docs]class MemoryIndicatorStorage(IndicatorStorage):
"""Store calculated indicator results on disk.
Used in
- Live trading
- Indicators are calculated just before `decide_trades()` is called
- Indicators are recalculated on every decision cycle
"""
[docs] def __init__(self, universe_key: UniverseCacheKey):
self.universe_key = universe_key
self.results: dict[IndicatorKey, IndicatorResult] = {}
def is_available(self, key: IndicatorKey) -> bool:
return key in self.results
def load(self, key: IndicatorKey) -> IndicatorResult:
return self.results[key]
def save(self, key: IndicatorKey, df: pd.DataFrame | pd.Series) -> IndicatorResult:
result = IndicatorResult(
universe_key=self.universe_key,
indicator_key=key,
data=df,
cached=False,
)
self.results[key] = result
return result
def get_disk_cache_path(self) -> Path | None:
return None
def get_universe_cache_path(self) -> Path | None:
return None
def _serialise_parameters_for_cache_key(parameters: dict) -> str:
for k, v in parameters.items():
assert type(k) == str
assert type(v) not in (list, tuple) # Don't leak test ranges here - must be a single value
return "".join([f"{k}={v}" for k, v in parameters.items()])
def _load_indicator_result(storage: DiskIndicatorStorage, key: IndicatorKey) -> IndicatorResult:
logger.info("Loading %s", key)
assert storage.is_available(key), f"Tried to load indicator that is not in the cache: {key}"
return storage.load(key)
def _calculate_and_save_indicator_result(
strategy_universe: TradingStrategyUniverse,
storage: DiskIndicatorStorage,
key: IndicatorKey,
) -> IndicatorResult:
indicator = key.definition
if indicator.is_per_pair():
assert key.pair.internal_id, f"Per-pair indicator lacks pair internal_id: {key.pair}"
match indicator.source:
case IndicatorSource.open_price:
column = "open"
input = strategy_universe.data_universe.candles.get_samples_by_pair(key.pair.internal_id)[column]
data = indicator.calculate_by_pair(input)
case IndicatorSource.close_price:
column = "close"
input = strategy_universe.data_universe.candles.get_samples_by_pair(key.pair.internal_id)[column]
data = indicator.calculate_by_pair(input)
case IndicatorSource.ohlcv:
input = strategy_universe.data_universe.candles.get_samples_by_pair(key.pair.internal_id)
data = indicator.calculate_by_pair_ohlcv(input)
case _:
raise AssertionError(f"Unsupported input source {key.pair} {key.definition} {indicator.source}")
else:
# Calculate indicator over the whole universe
data = indicator.calculate_universe(strategy_universe)
assert data is not None, f"Indicator function {indicator.name} ({indicator.func}) did not return any result, received Python None instead"
result = storage.save(key, data)
return result
[docs]def load_indicators(
strategy_universe: TradingStrategyUniverse,
storage: DiskIndicatorStorage,
indicator_set: IndicatorSet,
all_combinations: set[IndicatorKey],
max_readers=8,
show_progress=True,
) -> IndicatorResultMap:
"""Load cached indicators.
- Use a thread pool to speed up IO
:param all_combinations:
Load all cached indicators of this set if they are available in the storage.
:param storage:
The cache backend we use for the storage
:param max_readers:
Number of reader threads we allocate for the task
"""
task_args = []
for key in all_combinations:
if storage.is_available(key):
task_args.append((storage, key))
logger.info(
"Loading cached indicators indicators, we have %d indicator combinations out of %d available in the cache %s",
len(task_args),
len(all_combinations),
storage.get_universe_cache_path()
)
if len(task_args) == 0:
return {}
results = {}
label = indicator_set.get_label()
key: IndicatorKey
if show_progress:
progress_bar = tqdm(total=len(task_args), desc=f"Reading cached indicators {label} for {strategy_universe.get_pair_count()} pairs, using {max_readers} threads")
else:
progress_bar = None
try:
if max_readers > 1:
logger.info("Multi-thread reading")
executor = futureproof.ThreadPoolExecutor(max_workers=max_readers)
tm = futureproof.TaskManager(executor, error_policy=futureproof.ErrorPolicyEnum.RAISE)
# Run the checks parallel using the thread pool
tm.map(_load_indicator_result, task_args)
# Extract results from the parallel task queue
for task in tm.as_completed():
result = task.result
key = result.indicator_key
assert key not in results
results[key] = result
if progress_bar:
progress_bar.update()
else:
logger.info("Single-thread reading")
for result in itertools.starmap(_load_indicator_result, task_args):
key = result.indicator_key
assert key not in results
results[key] = result
return results
finally:
if progress_bar:
progress_bar.close()
[docs]def calculate_indicators(
strategy_universe: TradingStrategyUniverse,
storage: DiskIndicatorStorage,
indicators: IndicatorSet | None,
execution_context: ExecutionContext,
remaining: set[IndicatorKey],
max_workers=8,
label: str | None = None,
verbose=True,
) -> IndicatorResultMap:
"""Calculate indicators for which we do not have cached data yet.
- Use a thread pool to speed up IO
:param indicators:
Indicator set we calculate for.
Can be ``None`` for a grid search, as each individual combination may has its own set.
:param remaining:
Remaining indicator combinations for which we do not have a cached rresult
:param verbose:
Stdout user printing with helpful messages.
"""
assert isinstance(execution_context, ExecutionContext), f"Expected ExecutionContext, got {type(execution_context)}"
results: IndicatorResultMap
if label is None:
if indicators is not None:
label = indicators.get_label()
else:
label = "Indicator calculation"
logger.info("Calculating indicators: %s", label)
if len(remaining) == 0:
logger.info("Nothing to calculate")
return {}
task_args = []
for key in remaining:
task_args.append((strategy_universe, storage, key))
results = {}
if max_workers > 1:
# Do a parallel scan for the maximum speed
#
# Set up a futureproof task manager
#
# For futureproof usage see
# https://github.com/yeraydiazdiaz/futureproof
#
# Run individual searchers in child processes
#
# Copy universe data to child processes only once when the child process is created
#
pickled_universe = pickle.dumps(strategy_universe)
logger.info("Doing a multiprocess indicator calculation, picked universe is %d bytes", len(pickled_universe))
# Set up a process pool executing structure
executor = futureproof.ProcessPoolExecutor(max_workers=max_workers, initializer=_process_init, initargs=(pickled_universe,))
tm = futureproof.TaskManager(executor, error_policy=futureproof.ErrorPolicyEnum.RAISE)
# Set up a signal handler to stop child processes on quit
setup_indicator_multiprocessing(executor)
# Run the tasks
tm.map(_calculate_and_save_indicator_result, task_args)
# Track the child process completion using tqdm progress bar
with tqdm(total=len(task_args), desc=f"Calculating indicators {label} using {max_workers} processes") as progress_bar:
# Extract results from the parallel task queue
for task in tm.as_completed():
result = task.result
results[result.indicator_key] = result
progress_bar.update()
else:
# Do single thread - good for debuggers like pdb/ipdb
#
_universe = strategy_universe
logger.info("Doing a single thread indicator calculation")
iter = itertools.starmap(_calculate_and_save_indicator_result, task_args)
# Force workers to finish
result: IndicatorResult
for result in iter:
results[result.indicator_key] = result
logger.info("Total %d indicator results calculated", len(results))
return results
[docs]def prepare_indicators(
create_indicators: CreateIndicatorsProtocol,
parameters: StrategyParameters,
strategy_universe: TradingStrategyUniverse,
execution_context: ExecutionContext,
timestamp: datetime.datetime = None,
):
"""Call the strategy module indicator builder."""
indicators = call_create_indicators(
create_indicators,
parameters,
strategy_universe,
execution_context,
timestamp=timestamp,
)
if indicators.get_count() == 0:
# TODO: Might have legit use cases?
logger.warning(f"create_indicators() did not create a single indicator")
return indicators
[docs]def calculate_and_load_indicators(
strategy_universe: TradingStrategyUniverse,
storage: IndicatorStorage,
execution_context: ExecutionContext,
parameters: StrategyParameters | None = None,
indicators: IndicatorSet | None = None,
create_indicators: CreateIndicatorsProtocolV1 | None = None,
max_workers: int | Callable = get_safe_max_workers_count,
max_readers: int | Callable = get_safe_max_workers_count,
verbose=True,
timestamp: datetime.datetime = None,
) -> IndicatorResultMap:
"""Precalculate all indicators.
- Calculate indicators using multiprocessing
- Display TQDM progress bars for loading cached indicators and calculating new ones
- Use cached indicators if available
:param cache_warmup_only:
Only fill the disk cache, do not load results in the memory.
:param verbose:
Stdout printing with heplful messages to the user
"""
# Resolve CPU count
if callable(max_workers):
max_workers = max_workers()
if callable(max_readers):
max_readers = max_readers()
assert create_indicators or indicators, "You must give either create_indicators or indicators argument"
if create_indicators:
assert indicators is None, f"Give either indicators or create_indicators, not both"
assert parameters is not None, f"parameters argument must be given if you give create_indicators"
indicators = prepare_indicators(create_indicators, parameters, strategy_universe, execution_context, timestamp=timestamp)
assert isinstance(indicators, IndicatorSet), f"Got class {type(indicators)} when IndicatorSet expected"
all_combinations = set(indicators.generate_combinations(strategy_universe))
logger.info("Loading indicators %s for the universe %s, storage is %s", indicators.get_label(), strategy_universe.get_cache_key(), storage.get_disk_cache_path())
cached = load_indicators(strategy_universe, storage, indicators, all_combinations, max_readers=max_readers)
for key in cached.keys():
# Check we keyed this right
assert key in all_combinations, f"Loaded a cached result {key} is not in part of the all combinations we expected"
if verbose:
if len(all_combinations) > 0:
print(f"Using indicator cache {storage.get_universe_cache_path()}")
calculation_needed = all_combinations - set(cached.keys())
calculated = calculate_indicators(
strategy_universe,
storage,
indicators,
execution_context,
calculation_needed,
max_workers=max_workers,
)
result = cached | calculated
for key in result.keys():
# Check we keyed this right
assert key in all_combinations
return result
[docs]def warm_up_indicator_cache(
strategy_universe: TradingStrategyUniverse,
storage: DiskIndicatorStorage,
execution_context: ExecutionContext,
indicators: set[IndicatorKey],
max_workers=8,
) -> tuple[set[IndicatorKey], set[IndicatorKey]]:
"""Precalculate all indicators.
- Used for grid search
- Calculate indicators using multiprocessing
- Display TQDM progress bars for loading cached indicators and calculating new ones
- Use cached indicators if available
:return:
Tuple (Cached indicators, calculated indicators)
"""
cached = set()
needed = set()
pair_indicators_needed = set()
universe_indicators_needed = set()
for key in indicators:
if storage.is_available(key):
cached.add(key)
else:
needed.add(key)
if key.pair is None:
universe_indicators_needed.add(key)
else:
pair_indicators_needed.add(key)
logger.info(
"warm_up_indicator_cache(), we have %d cached pair-indicators results and need to calculate %d results, of which %d pair indicators and %d universe indicators",
len(cached),
len(needed),
len(pair_indicators_needed),
len(universe_indicators_needed),
)
calculated = calculate_indicators(
strategy_universe,
storage,
None,
execution_context,
needed,
max_workers=max_workers,
label=f"Calculating {len(needed)} indicators for the grid search"
)
logger.info(
"Calculated %d indicator results",
len(calculated),
)
return cached, needed
#: Process global stored universe for multiprocess workers
_universe: TradingStrategyUniverse | None = None
_process_pool: concurrent.futures.ProcessPoolExecutor | None = None
def _process_init(pickled_universe):
"""Child worker process initialiser."""
# Transfer ove the universe to the child process
global _universe
_universe = pickle.loads(pickled_universe)
def _handle_sigterm(*args):
# TODO: Despite all the effort, this does not seem to work with Visual Studio Code's Interrupt Kernel button
processes: list[Process] = list(_process_pool._processes.values())
_process_pool.shutdown()
for p in processes:
p.kill()
sys.exit(1)
[docs]def setup_indicator_multiprocessing(executor):
"""Set up multiprocessing for indicators.
- We use multiprocessing to calculate indicators
- We want to be able to abort (CTRL+C) gracefully
"""
global _process_pool_executor
_process_pool_executor = executor._executor
# Enable graceful multiprocessing termination only if we run as a backtesting noteboook
# pytest work around for: test_trading_strategy_engine_v050_live_trading
# ValueError: signal only works in main thread of the main interpreter
# https://stackoverflow.com/a/23207116/315168
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGTERM, _handle_sigterm)
[docs]def validate_function_kwargs(func: Callable, kwargs: dict):
"""Check that we can pass the given kwargs to a function.
Designed to be used with pandas_ta functions -
many special cases needs to be added.
:param func:
TA function
:param kwargs:
Parameters we think function can take
:raise IndicatorFunctionSignatureMismatch:
You typoed
"""
# https://stackoverflow.com/a/64504028/315168
assert callable(func)
sig = inspect.signature(func)
allowed_params = sig.parameters
for our_param, our_value in kwargs.items():
if our_param not in allowed_params:
raise IndicatorFunctionSignatureMismatch(f"Function {func} does not take argument {our_param}. Available arguments are: {allowed_params}.")