Source code for tradeexecutor.strategy.pandas_trader.indicator_decorator

"""Decorator syntax for indicators.

- Allow shorter definition of indicators using Python decorators, over the manual methods provided in :py:mod:`tradeexecutor.strategy.pandas_trade.indicator` module.

- See :py:class:`IndicatorRegistry` for usage.


"""
import inspect
from dataclasses import dataclass
import datetime
from functools import wraps
from typing import Callable, Iterable, Sequence

import pandas as pd

from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource, CreateIndicatorsProtocolV1, CreateIndicatorsProtocolV2
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse


class NotYetDefined(Exception):
    """There is a dependency resolution order issue with indicators."""
    pass


class ParameterMissing(Exception):
    """Indicator asks for a parameter that is missing."""
    pass


#: List of function arguments for different indicator functions that are built-in,
#: and do not come from StrategyParameters definitions.
#:
#: Each indicator source may have different arguments available.
#:
DEFAULT_INDICATOR_ARGUMENTS = {
    "close",
    "open",
    "high",
    "low",
    "volume",
    "strategy_universe",
    "dependency_resolver",
    "pair",
    "execution_context",
    "timestamp"
}


[docs]@dataclass(slots=True, frozen=True) class IndicatorDecoration: """Internal struct capturing information we need about indicators.""" #: Indicator name name: str #: Indicator function func: Callable #: Source data type source: IndicatorSource | None #: Linerised order for the parallel execution order: int #: Name of function arguments sans :py:attr:`DEFAULT_INDICATOR_ARGUMENTS` args: set[str] #: Dependencies needed to run this indicator dependencies: list[Callable]
[docs]def extract_args(func) -> set[str]: """Get the names of Python function arguments.""" signature = inspect.signature(func) return set(signature.parameters.keys()) - DEFAULT_INDICATOR_ARGUMENTS
[docs]def detect_source(func: Callable, source: IndicatorSource | None) -> IndicatorSource: """Based on indicator function arguments, detect what kind of data source it needs.""" if source is not None: return source signature = inspect.signature(func) args = set(signature.parameters.keys()) if "strategy_universe" in args: return IndicatorSource.strategy_universe elif ("open" in args) or ("high" in args) or ("low" in args) or ("volume" in args): return IndicatorSource.ohlcv elif ("close" in args): return IndicatorSource.close_price else: raise RuntimeError(f"Cannot detect IndicatorSource for function {func} - please manually specify using @indicators.define(source) argument")
[docs]class IndicatorRegistry: """Decorator-based helper class for defining strategy indicators. Use ``@indicatores.define()`` decorator to create strategy technical indicators in your notebooks. - Less typing than with manuaal ``indicator.add()`` syntax - `Parameters` class members are automatically mapped to the function argument with the same name - Indicators list other indicator functions they depend on using syntax ``@indicators.define(dependencies=(slow_sma, fast_sma))``, this is automatically resolved to the current dependency resolution order when the indicator calulation order is determined. - Indicator data can be looked up by indicator function or its string name Example: .. code-block:: python import pandas_ta from tradeexecutor.strategy.pandas_trader.indicator_decorator import IndicatorRegistry class Parameters: rsi_length = 20 indicators = IndicatorRegistry() @indicators.define() def rsi(close, rsi_length): return pandas_ta.rsi(close, rsi_length) # Calculate indicators - will spawn multiple worker processed, # or load cached results from the disk parameters = StrategyParameters.from_class(Parameters) indicators = calculate_and_load_indicators_inline( strategy_universe=strategy_universe, parameters=parameters, create_indicators=indicators.create_indicators, ) With indicators that depend on each other: .. code-block:: python import pandas_ta from tradeexecutor.strategy.pandas_trader.indicator_decorator import IndicatorRegistry class Parameters: fast_ma_length = 7 slow_ma_length = 21 indicators = IndicatorRegistry() @indicators.define() def slow_sma(close, slow_ma_length): return pandas_ta.sma(close, slow_ma_length) @indicators.define() def fast_sma(close, fast_ma_length): return pandas_ta.sma(close, fast_ma_length) @indicators.define(dependencies=(slow_sma, fast_sma)) def ma_crossover( close: pd.Series, pair: TradingPairIdentifier, dependency_resolver: IndicatorDependencyResolver, ) -> pd.Series: # You can use name or function to look up previously calcualted indicators data slow_sma: pd.Series = dependency_resolver.get_indicator_data(slow_sma) fast_sma: pd.Series = dependency_resolver.get_indicator_data("fast_sma") return fast_sma > slow_sma """
[docs] def __init__(self): self.registry: dict[str, IndicatorDecoration] = {}
[docs] def define( self, source: IndicatorSource=None, order: None | int=None, dependencies: Sequence[Callable]=None, ): """Function decorator to define indicator functions in your notebook. For the usage see :py:class:`IndicatorRegistry`. Short example: .. code-block:: python class Parameters: slow_ma_length = 21 indicators = IndicatorRegistry() @indicators.define() def slow_sma(close, slow_ma_length): return pandas_ta.sma(close, slow_ma_length) :param source: What kind of trading data this indicator needs as input. - Per-pair - World - Raw data - Data from earlier calculated indicators :param order: Manually define indicator order. Not usually needed. :param dependencies: List of functions which result value this indicator consumes. Needed with :py:attr:`~tradeexecutor.strategy.pandas_trader.indicator.IndicatorSource.dependencies_only_per_pair` and :py:attr:`~tradeexecutor.strategy.pandas_trader.indicator.IndicatorSource.dependencies_only_universe`. """ def decorator(func): nonlocal dependencies name = func.__name__ dependencies = list(dependencies) if dependencies else [] resolved_order = order if order else self.detect_order(func, dependencies) self.registry[name] = IndicatorDecoration( name=name, func=func, source=detect_source(func, source), order=resolved_order, args=extract_args(func), dependencies=dependencies, ) @wraps(func) def wrapper(*args, **kwargs): # Call the original function with the original arguments return func(*args, **kwargs) return wrapper return decorator
[docs] def detect_order(self, func: Callable, dependencies: list[Callable]) -> int: """Automatically resolve the order of indicators. :return: Linear order number for running the indicator. Can be used to parallerise the process. """ if dependencies is None: return 1 assert isinstance(dependencies, Iterable), f"Bad dependencies, does not look like a list: {dependencies} in {func}" for dependency in dependencies: assert callable(dependency), f"Dependency must be specified as a function, got {dependency} in {func}" if dependency.__name__ not in self.registry: already_defined = list(self.registry.keys()) raise NotYetDefined(f"Function {func} asks for dependency {dependency} which is not yet defined.\nWe have defined {already_defined}") dep_names = [func.__name__ for func in dependencies] max_order = max((dec.order for dec in self.registry.values() if dec.name in dep_names), default=0) return max_order + 1
[docs] def create_indicators( self, timestamp: datetime.datetime, parameters: StrategyParameters, strategy_universe: TradingStrategyUniverse, execution_context: ExecutionContext, ) -> IndicatorSet: """Hook function to be passed to :py:func:`calculate_and_load_indicators_inline`. Example: .. code-block:: python indicators = IndicatorRegistry() @indicators.define() def rsi(close, rsi_length): return pandas_ta.rsi(close, rsi_length) # Calculate indicators - will spawn multiple worker processed, # or load cached results from the disk parameters = StrategyParameters.from_class(Parameters) indicators = calculate_and_load_indicators_inline( strategy_universe=strategy_universe, parameters=parameters, create_indicators=indicators.create_indicators, ) """ indicators = IndicatorSet() for name, definition in self.registry.items(): applied_parameters = {} for parameter in definition.args: if parameter not in parameters: raise ParameterMissing(f"Function {name} requires parameter {parameter}, but this is not defined in strategy parameters.\nWe have: {list(parameters.keys())}") applied_parameters[parameter] = parameters[parameter] indicators.add( name=definition.name, func=definition.func, source=definition.source, order=definition.order, parameters=applied_parameters, ) return indicators
[docs] def get_diagnostics(self) -> pd.DataFrame: """Return a table that explains registered indicators. :return: Human-readable table """ data = [] for entry in self.registry.values(): data.append({ "Order": entry.order, "Name": entry.name, "Source": entry.source.name, "Parameters": ", ".join(entry.args), "Dependencies": ", ".join(d.__name__ for d in entry.dependencies), }) df = pd.DataFrame(data) df = df.sort_values(by=["Order", "Name"]) df = df.set_index("Name") return df