Source code for tradeexecutor.visual.utils

from typing import Optional
import pandas as pd
import logging

import datetime

import plotly.graph_objects as go

from tradeexecutor.strategy.execution_context import ExecutionContext
from tradingstrategy.charting.candle_chart import VolumeBarMode

from tradeexecutor.state.portfolio import Portfolio
from tradeexecutor.state.state import State
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.state.types import PairInternalId
from tradeexecutor.state.visualisation import Plot, PlotKind, PlotLabel
from tradeexecutor.strategy.trade_pricing import format_fees_dollars


logger = logging.getLogger(__name__)


[docs]def get_start_and_end( start_at: pd.Timestamp | datetime.datetime | None, end_at: pd.Timestamp | datetime.datetime | None, ): """Get and validate start and end timestamps""" if isinstance(start_at, datetime.datetime): start_at = pd.Timestamp(start_at) if isinstance(end_at, datetime.datetime): end_at = pd.Timestamp(end_at) if start_at is not None: assert isinstance(start_at, pd.Timestamp) if end_at is not None: assert isinstance(end_at, pd.Timestamp) return start_at, end_at
[docs]def export_trade_for_dataframe(p: Portfolio, t: TradeExecution) -> dict: """Export data for a Pandas dataframe presentation. - Decimal roundings are based on rule of thumb and may need to be tuned """ position = p.get_position_by_id(t.position_id) reserve_token_symbol = position.reserve_currency.token_symbol base_token_symbol = position.pair.get_pricing_pair().base.token_symbol price_prefix = f"{base_token_symbol} / USD" label = ["-" * 60] is_profitable = None if t.is_failed(): label += ["Failed trade"] type = "failed" elif t.is_repaired(): label += ["Repaired trade"] type = "failed" else: if t.is_stop_loss(): position = p.find_position_for_trade(t) profitable = position.is_profitable() type = "stop-loss" label += [ f"Stop loss {base_token_symbol}", "", f"Triggered at: {position.stop_loss:.4f} {price_prefix}", ] is_profitable = True if profitable else False elif t.is_take_profit(): type = "take-profit" label += [ f"Take profit {base_token_symbol}", "", f"Triggered at: {position.take_profit:.4f} {price_prefix}", ] elif t.is_credit_supply(): if t.is_buy(): type = "open-credit-supply" label += [ f"Open credit supply {reserve_token_symbol}", "", ] else: type = "close-credit-supply" label += [ f"Close credit supply {reserve_token_symbol}", "", ] elif t.is_sell(): type = "sell" label += [ f"Sell {base_token_symbol}", "", ] elif t.is_buy(): type = "buy" label += [ f"Buy {base_token_symbol}", "", ] label += [ # "", f"Executed at: {t.executed_at}", f"Value: {t.get_value():.4f} USD", f"Quantity: {abs(t.get_position_quantity()):.6f} {base_token_symbol}", # "", ] label += [ # f"Mid-price: {t.planned_mid_price:.4f} {price_prefix}" # if t.planned_mid_price # else "", f"Executed at price: {t.executed_price:.4f} {price_prefix}" if t.executed_price else "", # f"Estimated execution price: {t.planned_price:.4f} {price_prefix}" # if t.planned_price # else "", # "", ] if t.is_success() and t.lp_fees_paid is not None: label += [ f"Fees paid: {format_fees_dollars(t.get_fees_paid())}", ] if t.cost_of_gas: label += [f"Gas fee: {t.cost_of_gas:.4f}"] if t.notes: label += t.notes.split("\n") # See Plotly Scatter usage https://stackoverflow.com/a/61349739/315168 return { "timestamp": t.executed_at, "success": t.is_success(), "type": type, "label": "<br>".join(label), "price": t.planned_mid_price if t.planned_mid_price else t.planned_price, "is_profitable": is_profitable, }
[docs]def export_trades_as_dataframe( portfolio: Portfolio, pair_id: PairInternalId, start: Optional[pd.Timestamp] = None, end: Optional[pd.Timestamp] = None, include_credit_supply_positions: bool = False, ) -> pd.DataFrame: """Convert executed trades to a dataframe, so it is easier to work with them in Plotly. :param start_at: Crop range :param end_at: Crop range """ if start: if isinstance(start, datetime.datetime): start = pd.Timestamp(start) assert isinstance(start, pd.Timestamp), f"Got {start} {start.__class__}" if end: if isinstance(end, datetime.datetime): end = pd.Timestamp(end) assert isinstance(end, pd.Timestamp), f"Got {end} {end.__class__}" assert end data = [] for t in portfolio.get_all_trades(): if t.is_credit_supply(): if not include_credit_supply_positions: continue elif ( pair_id is not None and t.pair.get_pricing_pair().internal_id != pair_id ): continue # Crop if start or end: s = t.opened_at or t.started_at if not s: # Hotfix to some invalid data? logger.info("Trade lacks start date: %s", t) continue if s < start or s > end: continue data.append(export_trade_for_dataframe(portfolio, t)) return pd.DataFrame(data)
[docs]def visualise_trades( fig: go.Figure, candles: pd.DataFrame, trades_df: pd.DataFrame, candlestick_row: int | None = None, column: int | None = None, include_credit_supply_positions: bool = False, ): """Plot individual trades over the candlestick chart.""" # If we have used stop loss, do different categories if include_credit_supply_positions: advanced_trade_types = ("stop-loss", "take-profit", "open-credit-supply", "close-credit-supply") else: advanced_trade_types = ("stop-loss", "take-profit") advanced_trades = ( len(trades_df.loc[trades_df["type"].isin(advanced_trade_types)]) > 0 ) stop_loss_df = None take_profit_df = None open_credit_supply_df = None close_credit_supply_df = None if advanced_trades: buys_df = trades_df.loc[trades_df["type"] == "buy"] sells_df = trades_df.loc[trades_df["type"] == "sell"] stop_loss_df = trades_df.loc[trades_df["type"] == "stop-loss"].copy() take_profit_df = trades_df.loc[trades_df["type"] == "take-profit"] if include_credit_supply_positions: open_credit_supply_df = trades_df.loc[trades_df["type"] == "open-credit-supply"] close_credit_supply_df = trades_df.loc[trades_df["type"] == "close-credit-supply"] else: buys_df = trades_df.loc[trades_df["type"] == "buy"] sells_df = trades_df.loc[trades_df["type"] == "sell"] # Buys fig.add_trace( go.Scatter( name="Buy", mode="markers", x=buys_df["timestamp"], y=buys_df["price"], text=buys_df["label"], marker={ "color": "#aaaaff", "symbol": "triangle-right", "size": 12, "line": {"width": 1, "color": "#3333aa"}, }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) # Sells fig.add_trace( go.Scatter( name="Sell", mode="markers", x=sells_df["timestamp"], y=sells_df["price"], text=sells_df["label"], marker={ "color": "#aaaaff", "symbol": "triangle-left", "size": 12, "line": {"width": 1, "color": "#3333aa"}, }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) if stop_loss_df is not None: stop_loss_df['colour'] = stop_loss_df['is_profitable'].apply(lambda x: 'green' if x == True else 'orangered') fig.add_trace( go.Scatter( name="Stop loss", mode="markers", x=stop_loss_df["timestamp"], y=stop_loss_df["price"], text=stop_loss_df["label"], marker={ "symbol": "arrow-down", "size": 12, "line": {"width": 1, "color": "black"}, "color": stop_loss_df['colour'], }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) if take_profit_df is not None: fig.add_trace( go.Scatter( name="Take profit", mode="markers", x=take_profit_df["timestamp"], y=take_profit_df["price"], text=take_profit_df["label"], marker={ "symbol": "arrow-up", "size": 12, "line": {"width": 1, "color": "black"}, "color": "lightgreen", }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) if open_credit_supply_df is not None: fig.add_trace( go.Scatter( name="Open credit supply", mode="markers", x=open_credit_supply_df["timestamp"], y=open_credit_supply_df["price"], text=open_credit_supply_df["label"], marker={ "symbol": "triangle-ne", "size": 12, "line": {"width": 1, "color": "black"}, "color": "green", }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) if close_credit_supply_df is not None: fig.add_trace( go.Scatter( name="Close credit supply", mode="markers", x=close_credit_supply_df["timestamp"], y=close_credit_supply_df["price"], text=close_credit_supply_df["label"], marker={ "symbol": "triangle-sw", "size": 12, "line": {"width": 1, "color": "black"}, "color": "orange", }, hoverinfo="text", ), secondary_y=False, row=candlestick_row, col=column, ) return fig
[docs]def get_all_positions(state: State, pair_id): """Get all positions for a given pair""" assert type(pair_id) == int positions = [ p for p in state.portfolio.get_all_positions() if p.pair.get_pricing_pair().internal_id == pair_id ] return positions
[docs]def get_pair_name_from_first_trade(first_trade: TradeExecution): return ( f"{first_trade.pair.base.token_symbol} - {first_trade.pair.quote.token_symbol}" )
[docs]def get_pair_base_quote_names(state: State, pair_id: int | None): """Get all positions for the trading pair we want to visualise""" if pair_id: positions = get_all_positions(state, pair_id) else: positions = [] if len(positions) > 0: first_trade = positions[0].get_first_trade() else: first_trade = None if first_trade: pair_name = get_pair_name_from_first_trade(first_trade) pair = first_trade.pair base_token = pair.base.token_symbol quote_token = pair.quote.token_symbol else: pair_name = None base_token = None quote_token = None return pair_name, base_token, quote_token
def _get_title(name: str, title: str): if title is True: return name elif type(title) == str: return title else: return None def _get_axes_and_volume_text( axes: bool, pair_name: str | None, volume_axis_name: str = "Volume USD" ): """Get axes and volume text""" if axes: axes_text = pair_name volume_text = volume_axis_name else: axes_text = None volume_text = None return axes_text, volume_text
[docs]def get_all_text( state_name: str, axes: bool, title: str | None, pair_name: str | None, volume_axis_name: str, ): title_text = _get_title(state_name, title) axes_text, volume_text = _get_axes_and_volume_text( axes, pair_name, volume_axis_name ) return title_text, axes_text, volume_text
def _get_num_detached_indicators( plots: list[Plot], execution_context: ExecutionContext, volume_bar_mode: VolumeBarMode, detached_indicators: bool): """Get the number of detached technical indicators""" assert isinstance(execution_context, ExecutionContext) if detached_indicators: num_detached_indicators = sum( plot.kind == PlotKind.technical_indicator_detached for plot in plots ) else: num_detached_indicators = 0 if volume_bar_mode in {VolumeBarMode.hidden, VolumeBarMode.overlay}: pass elif volume_bar_mode == VolumeBarMode.separate: num_detached_indicators += 1 else: raise ValueError(f"Unknown volume bar mode {VolumeBarMode}") return num_detached_indicators def _get_plot_name_and_separator(plot: Plot) -> str: if plot.label == PlotLabel.hidden: return "" return f"<br> + {plot.name}" def _get_subplot_names( plots: list[Plot], execution_context: ExecutionContext, volume_bar_mode: VolumeBarMode, volume_axis_name: str = "Volume USD", pair_name: str = None, ): """Get subplot names for detached technical indicators. Overlaid names are appended to the detached plot name.""" assert isinstance(execution_context, ExecutionContext) if volume_bar_mode in {VolumeBarMode.hidden, VolumeBarMode.overlay}: subplot_names = [] detached_without_overlay_count = 0 else: subplot_names = [volume_axis_name] detached_without_overlay_count = 1 # for allowing multiple overlays on detached plots # list of detached plot names that already have overlays already_overlaid_names = [] for plot in plots: # get subplot names for detached technical indicators without any overlay if (plot.kind == PlotKind.technical_indicator_detached) and ( plot.name not in [ plot.detached_overlay_name for plot in plots if plot.kind == PlotKind.technical_indicator_overlay_on_detached ] ): subplot_names.append(plot.name) detached_without_overlay_count += 1 # get subplot names for detached technical indicators with overlay if plot.kind == PlotKind.technical_indicator_overlay_on_detached: # check that detached plot exists detached_plots = [ plot.name for plot in plots if plot.kind == PlotKind.technical_indicator_detached ] # Don't crash live trading due to visualisation bugs. # This can e.g. happen if the strategy code changes # and the state file contains data for old indicators if not execution_context.mode.is_live_trading(): assert ( plot.detached_overlay_name in detached_plots ), f"Overlay name {plot.detached_overlay_name} not in available detached plots {detached_plots}. Make sure there is a matching plot with the same name and kind PlotKind.technical_indicator_detached in the visualisation data." # check if another overlay exists if plot.detached_overlay_name in already_overlaid_names: # add to existing overlay subplot_names[ detached_without_overlay_count + already_overlaid_names.index(plot.detached_overlay_name) ] += _get_plot_name_and_separator(plot) else: # add to list subplot_names.append(plot.detached_overlay_name + _get_plot_name_and_separator(plot)) already_overlaid_names.append(plot.detached_overlay_name) # Insert blank name for main candle chart subplot_names.insert(0, pair_name) return subplot_names
[docs]def get_subplot_names( pair_name: str | None, volume_axis_name: str, volume_bar_mode: VolumeBarMode, detached_indicators: bool, plots: list[Plot] = None, execution_context: ExecutionContext = None, ) -> list[str]: """Get subplot names for detached technical indicators. Overlaid names are appended to the detached plot name. plots and execution_context are only needed if detached_indicators is True.""" if detached_indicators: # assert isinstance(plots, list), f"Expected list, got {type(plots)}. Shouldn't happen" assert isinstance(execution_context, ExecutionContext), f"Expected ExecutionContext, got {type(execution_context)}. Shouldn't happen" subplot_names = _get_subplot_names( plots, execution_context, volume_bar_mode, volume_axis_name, pair_name ) elif volume_bar_mode == VolumeBarMode.separate: subplot_names = [pair_name, volume_axis_name] else: subplot_names = [pair_name] return subplot_names
[docs]def get_num_detached_and_names( technical_indicators: bool, plots: list[Plot], execution_context: ExecutionContext, volume_bar_mode: VolumeBarMode, volume_text: str, pair_name: str | None, detached_indicators: bool, ): if technical_indicators: num_detached_indicators, subplot_names = _get_num_detached_and_names( plots, execution_context, volume_bar_mode, volume_text, pair_name, detached_indicators, ) else: num_detached_indicators, subplot_names = _get_num_detached_and_names_no_indicators(execution_context, volume_bar_mode, volume_text, pair_name) return num_detached_indicators, subplot_names
def _get_num_detached_and_names( plots: list[Plot], execution_context: ExecutionContext, volume_bar_mode: VolumeBarMode, volume_axis_name: str, pair_name: str | None = None, detached_indicators: bool = True, ): """Get num_detached_indicators and subplot_names""" assert isinstance(execution_context, ExecutionContext), f"Expected ExecutionContext, got {type(execution_context)}" num_detached_indicators = _get_num_detached_indicators(plots, execution_context, volume_bar_mode, detached_indicators) subplot_names = get_subplot_names(pair_name, volume_axis_name, volume_bar_mode, detached_indicators, plots, execution_context) return num_detached_indicators, subplot_names def _get_num_detached_and_names_no_indicators( execution_context: ExecutionContext, volume_bar_mode: VolumeBarMode, volume_axis_name: str, pair_name: str | None = None, ): """Get num_detached_indicators and subplot_names. Used when technical_indicators == False""" assert isinstance(execution_context, ExecutionContext) if volume_bar_mode == VolumeBarMode.separate: num_detached_indicators = 1 else: num_detached_indicators = 0 subplot_names = get_subplot_names(pair_name, volume_axis_name, volume_bar_mode, False) return num_detached_indicators, subplot_names