Source code for tradeexecutor.cli.close_position

"""Closing all positions externally.

Code to clean up positions or forcing a shutdown.
"""
import logging
import datetime

from tabulate import tabulate
from web3 import Web3

from tradeexecutor.analysis.position import display_positions
from tradeexecutor.ethereum.enzyme.vault import EnzymeVaultSyncModel
from tradeexecutor.ethereum.onchain_balance import fetch_address_balances
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.sync_model import SyncModel
from tradeexecutor.strategy.valuation import ValuationModel
from tradeexecutor.strategy.valuation_update import update_position_valuations
from tradingstrategy.types import Percent
from tradeexecutor.state.state import State
from tradeexecutor.strategy.execution_model import ExecutionModel
from tradeexecutor.strategy.pandas_trader.position_manager import PositionManager
from tradeexecutor.strategy.pricing_model import PricingModel
from tradeexecutor.strategy.routing import RoutingModel, RoutingState
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse


logger = logging.getLogger(__name__)


class CloseAllAborted(Exception):
    """Interactively chosen to cancel"""


[docs]def close_single_or_all_positions( web3: Web3, execution_model: ExecutionModel, execution_context: ExecutionContext, pricing_model: PricingModel, sync_model: SyncModel, state: State, universe: TradingStrategyUniverse, routing_model: RoutingModel, routing_state: RoutingState, valuation_model: ValuationModel, slippage_tolerance: Percent, interactive=True, position_id: int | None = None, unit_testing=False, close_by_sell=True, blacklist_marked_down=True, ): """Close single/all positions. - CLI entry point - Sync reserves before starting - Close any open positions - Display trade execution and position report afterwards """ assert isinstance(sync_model, SyncModel) assert isinstance(universe, TradingStrategyUniverse) assert isinstance(valuation_model, ValuationModel) assert isinstance(execution_context, ExecutionContext) if position_id is not None: assert type(position_id) is int, f"Got: {position_id} {type(position_id)}" assert position_id >= 0 ts = datetime.datetime.utcnow() # Sync nonce for the hot wallet execution_model.initialize() logger.info("Sync model is %s", sync_model) logger.info("Trading university reserve asset is %s", universe.get_reserve_asset()) # Use unit_testing flag so this code path is easier to check if sync_model.has_async_deposits() or unit_testing: logger.info("Vault must be revalued before proceeding, using: %s", sync_model.__class__.__name__) update_position_valuations( timestamp=ts, state=state, universe=universe, execution_context=execution_context, routing_state=routing_state, valuation_model=valuation_model, long_short_metrics_latest=None, ) # Sync any incoming stablecoin transfers # that have not been synced yet balance_updates = sync_model.sync_treasury( ts, state, list(universe.reserve_assets), post_valuation=True, ) logger.info("We received balance update events: %s", balance_updates) # Velvet capital code path if sync_model.has_position_sync(): sync_model.sync_positions( ts, state, universe, pricing_model ) vault_address = sync_model.get_key_address() hot_wallet = sync_model.get_hot_wallet() gas_at_start = hot_wallet.get_native_currency_balance(web3) logger.info("Account data before starting to close all") logger.info(" Vault address: %s", vault_address) logger.info(" Hot wallet address: %s", hot_wallet.address) logger.info(" Hot wallet balance: %s", gas_at_start) if isinstance(sync_model, EnzymeVaultSyncModel): vault = sync_model.vault logger.info(" Comptroller address: %s", vault.comptroller.address) logger.info(" Vault owner: %s", vault.vault.functions.getOwner().call()) sync_model.check_ownership() if len(state.portfolio.reserves) == 0: raise RuntimeError("No reserves detected for the strategy. Does your wallet/vault have USDC deposited for trading?") reserve_currency = state.portfolio.get_default_reserve_position().asset.token_symbol reserve_currency_at_start = state.portfolio.get_default_reserve_position().get_value() logger.info(" Reserve currency balance: %s %s", reserve_currency_at_start, reserve_currency) assert reserve_currency_at_start > 0, f"No deposits available to trade. Vault at {vault_address}" # Create PositionManager helper class # that helps open and close positions position_manager = PositionManager( ts, universe.data_universe, state, pricing_model, default_slippage_tolerance=slippage_tolerance, ) # Open the test position only if there isn't position already open # on the previous run open_positions = list(state.portfolio.open_positions.values()) if position_id is None: logger.info("Performing close-all for %d open positions", len(open_positions)) positions_to_close = list(open_positions) else: logger.info("Performing close-position for position #%d", position_id) if position_id in state.portfolio.open_positions: positions_to_close = [state.portfolio.open_positions[position_id]] elif position_id in state.portfolio.frozen_positions: positions_to_close = [state.portfolio.frozen_positions[position_id]] else: raise RuntimeError(f"Position #{position_id} does not exist") for p in positions_to_close: logger.info(" Position: %s, quantity %s", p, p.get_quantity()) trading_quantity = p.get_available_trading_quantity() quantity = p.get_quantity() onchain_fetch_data = sync_model.fetch_onchain_balances( [p.pair.base], filter_zero=False, ) onchain_balance = next(iter(onchain_fetch_data)) if trading_quantity != quantity: logger.info( "Position #%d quantity: %f, available for trade quantity: %f, onchain quantity: %f", p.position_id, quantity, trading_quantity, onchain_balance.amount, ) for t in p.trades.values(): logger.info("Trade %s, quantity: %s", t, quantity) assert trading_quantity == quantity, (f"Position quantity vs. available trading quantity mismatch.\n" f"Probably unexecuted trades? {quantity} vs. {trading_quantity}\n" f"Position: {p}") if interactive: if close_by_sell: logger.info("We will attempt to close the positions by selling") else: logger.info("We will mark positions to zero") confirmation = input("Attempt to close positions [y/n]").lower() if confirmation != "y": raise CloseAllAborted() portfolio = state.portfolio assert len(positions_to_close) > 0, "Strategy does not have any open positions to close" if close_by_sell: for p in positions_to_close: # The message left on the positions that were closed note = f"Close sell with CLI command at {datetime.datetime.utcnow()}" # Create trades to open the position logger.info("Closing position %s", p) assert not p.is_closed(), "Was already closed" trades = position_manager.close_position(p) assert len(trades) == 1 trade = trades[0] # Compose the trades as approve() + swapTokenExact(), # broadcast them to the blockchain network and # wait for the confirmation execution_model.execute_trades( ts, state, trades, routing_model, routing_state, ) if not trade.is_success(): logger.error("Trade failed: %s", trade) logger.error("Tx hash: %s", trade.blockchain_transactions[-1].tx_hash) logger.error("Revert reason: %s", trade.blockchain_transactions[-1].revert_reason) logger.error("Trade dump:\n%s", trade.get_debug_dump()) raise AssertionError("Trade to close position failed") if p.notes is None: p.notes = "" p.add_notes_message(note) else: # TODO: Add accounting correction # TODO: Add blacklist not to touch this position again for p in positions_to_close: assert not p.is_closed(), "Was already closed" if p.is_frozen(): del portfolio.frozen_positions[p.position_id] elif p.is_open(): del portfolio.open_positions[p.position_id] else: raise NotImplementedError(f"Cannot mark down closed position: {p}") portfolio.closed_positions[p.position_id] = p p.mark_down() logger.info(f"Position was marked down and moved to closed positions: {p}") # Also add to the blacklist if blacklist_marked_down: state.blacklist_asset(p.pair.base) for p in positions_to_close: assert p.is_closed(), f"Failed to close position: {p}" assert p.position_id in portfolio.closed_positions, f"Position was not in closed positions: {p}" assert p.position_id not in portfolio.frozen_positions, f"Position was back in frozen positions: {p}" gas_at_end = hot_wallet.get_native_currency_balance(web3) reserve_currency_at_end = state.portfolio.get_default_reserve_position().get_value() logger.info("Trade report") logger.info(" Gas spent: %s", gas_at_start - gas_at_end) logger.info(" Trades done currently: %d", len(list(state.portfolio.get_all_trades()))) logger.info(" Reserves currently: %s %s", reserve_currency_at_end, reserve_currency) logger.info(" Reserve currency spent: %s %s", reserve_currency_at_start - reserve_currency_at_end, reserve_currency) df = display_positions(state.portfolio.frozen_positions.values()) position_info = tabulate(df, headers='keys', tablefmt='rounded_outline') logger.info("Position data for positions that were closed:\n%s", position_info)