Source code for tradeexecutor.testing.simulated_execution_loop

"""Set up execution loop that connects to an Ethereum testing backend.

"""
import datetime
import queue

from eth_account.signers.local import LocalAccount
from eth_defi.hotwallet import HotWallet
from web3 import Web3

from tradeexecutor.cli.loop import ExecutionLoop
from tradeexecutor.ethereum.execution import EthereumExecution
from tradeexecutor.ethereum.hot_wallet_sync_model import EthereumHotWalletReserveSyncer, HotWalletSyncModel
from tradeexecutor.ethereum.tx import HotWalletTransactionBuilder
from tradeexecutor.state.state import State
from tradeexecutor.state.store import NoneStore
from tradeexecutor.strategy.approval import UncheckedApprovalModel
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.execution_context import ExecutionContext, ExecutionMode
from tradeexecutor.strategy.generic.generic_pricing_model import GenericPricing
from tradeexecutor.strategy.generic.generic_router import GenericRouting
from tradeexecutor.strategy.generic.generic_valuation import GenericValuation
from tradeexecutor.strategy.pandas_trader.runner import PandasTraderRunner
from tradeexecutor.strategy.run_state import RunState
from tradeexecutor.strategy.strategy_module import DecideTradesProtocol
from tradeexecutor.strategy.universe_model import StaticUniverseModel, StrategyExecutionUniverse
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradeexecutor.utils.timer import timed_task

from tradeexecutor.ethereum.uniswap_v2.uniswap_v2_execution import UniswapV2Execution
from tradeexecutor.ethereum.uniswap_v2.uniswap_v2_live_pricing import UniswapV2LivePricing
from tradeexecutor.ethereum.uniswap_v2.uniswap_v2_routing import UniswapV2Routing
from tradeexecutor.ethereum.uniswap_v2.uniswap_v2_valuation import UniswapV2PoolRevaluator

from tradeexecutor.ethereum.uniswap_v3.uniswap_v3_execution import UniswapV3Execution
from tradeexecutor.ethereum.uniswap_v3.uniswap_v3_live_pricing import UniswapV3LivePricing
from tradeexecutor.ethereum.uniswap_v3.uniswap_v3_routing import UniswapV3Routing
from tradeexecutor.ethereum.uniswap_v3.uniswap_v3_valuation import UniswapV3PoolRevaluator

from tradeexecutor.ethereum.one_delta.one_delta_execution import OneDeltaExecution
from tradeexecutor.ethereum.one_delta.one_delta_live_pricing import OneDeltaLivePricing
from tradeexecutor.ethereum.one_delta.one_delta_routing import OneDeltaRouting
from tradeexecutor.ethereum.one_delta.one_delta_valuation import OneDeltaPoolRevaluator


[docs]def set_up_simulated_execution_loop_uniswap_v2( *ignore, web3: Web3, decide_trades: DecideTradesProtocol, universe: StrategyExecutionUniverse, routing_model: UniswapV2Routing, state: State, wallet_account: LocalAccount, ) -> ExecutionLoop: """Set up a simulated execution loop. Create a strategy execution that connects to in-memory blockchain simulation. This allows us to step through trades block by block and have strategies to respodn to price action (e.g. stop loss) See `test_uniswap_live_stop_loss.py` for an example. :param wallet_account: A trader account with some deployed money :return: Execution loop you can manually poke forward tick by tick, block by block. """ if ignore: # https://www.python.org/dev/peps/pep-3102/ raise TypeError("Only keyword arguments accepted") assert isinstance(wallet_account, LocalAccount) assert isinstance(routing_model, UniswapV2Routing) execution_context = ExecutionContext( mode=ExecutionMode.simulated_trading, ) # Create empty state for this backtest store = NoneStore(state) hot_wallet = HotWallet(wallet_account) # hot_wallet_sync = EthereumHotWalletReserveSyncer(web3, wallet_account.address) sync_model = HotWalletSyncModel(web3, hot_wallet) sync_model.setup_all(state, list(universe.reserve_assets)) cycle_duration = CycleDuration.cycle_unknown universe_model = StaticUniverseModel(universe) tx_builder = HotWalletTransactionBuilder(web3, hot_wallet) execution_model = UniswapV2Execution( tx_builder, max_slippage=1.00, confirmation_block_count=0, # Must be zero for the test chain ) # Pricing model factory for single Uni v2 exchange def pricing_model_factory(execution_model, universe, routing_model): return UniswapV2LivePricing( web3, universe.data_universe.pairs, routing_model) # Valuation model factory for single Uni v2 exchange def valuation_model_factory(pricing_model): return UniswapV2PoolRevaluator(pricing_model) runner = PandasTraderRunner( timed_task_context_manager=timed_task, execution_model=execution_model, approval_model=UncheckedApprovalModel(), valuation_model_factory=valuation_model_factory, sync_model=sync_model, pricing_model_factory=pricing_model_factory, routing_model=routing_model, decide_trades=decide_trades, execution_context=execution_context, trade_settle_wait=datetime.timedelta(seconds=1), unit_testing=True, ) loop = ExecutionLoop( name="simulated_execution", command_queue=queue.Queue(), execution_context=execution_context, execution_model=execution_model, sync_model=sync_model, pricing_model_factory=pricing_model_factory, valuation_model_factory=valuation_model_factory, strategy_factory=None, store=store, cycle_duration=cycle_duration, client=None, approval_model=UncheckedApprovalModel(), stats_refresh_frequency=datetime.timedelta(0), position_trigger_check_frequency=datetime.timedelta(0), ) loop.init_simulation( universe_model, runner, ) return loop
[docs]def set_up_simulated_execution_loop_uniswap_v3( *ignore, web3: Web3, decide_trades: DecideTradesProtocol, universe: StrategyExecutionUniverse, routing_model: UniswapV3Routing, state: State, wallet_account: LocalAccount, ) -> ExecutionLoop: """Set up a simulated execution loop for Uniswap V3. Create a strategy execution that connects to in-memory blockchain simulation. This allows us to step through trades block by block and have strategies to respodn to price action (e.g. stop loss) See `test_uniswap_live_stop_loss_uniswap_v3.py` for an example. :return: Execution loop you can manually poke forward tick by tick, block by block. """ if ignore: # https://www.python.org/dev/peps/pep-3102/ raise TypeError("Only keyword arguments accepted") assert isinstance(wallet_account, LocalAccount) assert isinstance(routing_model, UniswapV3Routing) execution_context = ExecutionContext( mode=ExecutionMode.simulated_trading, ) # Create empty state for this backtest store = NoneStore(state) hot_wallet = HotWallet(wallet_account) # hot_wallet_sync = EthereumHotWalletReserveSyncer(web3, wallet_account.address) sync_model = HotWalletSyncModel(web3, hot_wallet) cycle_duration = CycleDuration.cycle_unknown universe_model = StaticUniverseModel(universe) tx_builder = HotWalletTransactionBuilder(web3, hot_wallet) execution_model = UniswapV3Execution( tx_builder, max_slippage=1.00, confirmation_block_count=0, # Must be zero for the test chain ) # Pricing model factory for single Uni v2 exchange def pricing_model_factory(execution_model, universe: StrategyExecutionUniverse, routing_model): return UniswapV3LivePricing( web3, universe.universe.pairs, routing_model) # Valuation model factory for single Uni v2 exchange def valuation_model_factory(pricing_model): return UniswapV3PoolRevaluator(pricing_model) runner = PandasTraderRunner( timed_task_context_manager=timed_task, execution_model=execution_model, approval_model=UncheckedApprovalModel(), valuation_model_factory=valuation_model_factory, sync_model=sync_model, pricing_model_factory=pricing_model_factory, routing_model=routing_model, decide_trades=decide_trades, execution_context=execution_context, trade_settle_wait=datetime.timedelta(seconds=1), unit_testing=True, ) loop = ExecutionLoop( name="simulated_execution", command_queue=queue.Queue(), execution_context=execution_context, execution_model=execution_model, sync_model=sync_model, pricing_model_factory=pricing_model_factory, valuation_model_factory=valuation_model_factory, strategy_factory=None, store=store, cycle_duration=cycle_duration, client=None, approval_model=UncheckedApprovalModel(), stats_refresh_frequency=datetime.timedelta(0), position_trigger_check_frequency=datetime.timedelta(0), ) loop.init_simulation( universe_model, runner, ) return loop
[docs]def set_up_simulated_execution_loop_one_delta( *, web3: Web3, decide_trades: DecideTradesProtocol, universe: StrategyExecutionUniverse, routing_model: OneDeltaRouting, state: State, wallet_account = None, ) -> ExecutionLoop: """Set up a simulated execution loop for 1delta. Create a strategy execution that connects to in-memory blockchain simulation. This allows us to step through trades block by block and have strategies to respodn to price action (e.g. stop loss) See `test_one_delta_live_short.py` for an example. :return: Execution loop you can manually poke forward tick by tick, block by block. """ # assert isinstance(wallet_account, LocalAccount) assert isinstance(routing_model, OneDeltaRouting) execution_context = ExecutionContext( mode=ExecutionMode.simulated_trading, engine_version="0.3", ) # Create empty state for this backtest store = NoneStore(state) hot_wallet = HotWallet(wallet_account) # hot_wallet_sync = EthereumHotWalletReserveSyncer(web3, wallet_account.address) sync_model = HotWalletSyncModel(web3, hot_wallet) cycle_duration = CycleDuration.cycle_unknown assert isinstance(universe, TradingStrategyUniverse) universe_model = StaticUniverseModel(universe) tx_builder = HotWalletTransactionBuilder(web3, hot_wallet) execution_model = OneDeltaExecution( tx_builder, max_slippage=1.00, mainnet_fork=True, confirmation_block_count=0, ) def pricing_model_factory(execution_model, universe: StrategyExecutionUniverse, routing_model): return OneDeltaLivePricing( web3, universe.universe.pairs, routing_model ) def valuation_model_factory(pricing_model): return OneDeltaPoolRevaluator(pricing_model) runner = PandasTraderRunner( timed_task_context_manager=timed_task, execution_model=execution_model, approval_model=UncheckedApprovalModel(), valuation_model_factory=valuation_model_factory, sync_model=sync_model, pricing_model_factory=pricing_model_factory, routing_model=routing_model, decide_trades=decide_trades, execution_context=execution_context, unit_testing=True, trade_settle_wait=datetime.timedelta(seconds=1), ) loop = ExecutionLoop( name="simulated_execution", command_queue=queue.Queue(), execution_context=execution_context, execution_model=execution_model, sync_model=sync_model, pricing_model_factory=pricing_model_factory, valuation_model_factory=valuation_model_factory, strategy_factory=None, store=store, cycle_duration=cycle_duration, client=None, approval_model=UncheckedApprovalModel(), stats_refresh_frequency=datetime.timedelta(0), position_trigger_check_frequency=datetime.timedelta(0), ) loop.init_simulation(universe_model, runner) return loop
[docs]def set_up_simulated_ethereum_generic_execution( *, web3: Web3, decide_trades: DecideTradesProtocol, universe: StrategyExecutionUniverse, routing_model: GenericRouting, pricing_model: GenericPricing, valuation_model: GenericValuation, state: State, hot_wallet: HotWallet, ) -> ExecutionLoop: """Set up a simulated execution loop for generic routing. """ assert isinstance(routing_model, GenericRouting) assert isinstance(pricing_model, GenericPricing) assert isinstance(hot_wallet, HotWallet) execution_context = ExecutionContext( mode=ExecutionMode.simulated_trading, engine_version="0.3", ) # Create empty state for this backtest store = NoneStore(state) # hot_wallet_sync = EthereumHotWalletReserveSyncer(web3, wallet_account.address) sync_model = HotWalletSyncModel(web3, hot_wallet) cycle_duration = CycleDuration.cycle_unknown assert isinstance(universe, TradingStrategyUniverse) universe_model = StaticUniverseModel(universe) tx_builder = HotWalletTransactionBuilder(web3, hot_wallet) execution_model = EthereumExecution( tx_builder, max_slippage=1.00, mainnet_fork=True, confirmation_block_count=0, ) def pricing_model_factory(execution_model, universe: StrategyExecutionUniverse, routing_model): return pricing_model def valuation_model_factory(pricing_model): return valuation_model runner = PandasTraderRunner( timed_task_context_manager=timed_task, execution_model=execution_model, approval_model=UncheckedApprovalModel(), valuation_model_factory=valuation_model_factory, sync_model=sync_model, pricing_model_factory=pricing_model_factory, routing_model=routing_model, decide_trades=decide_trades, execution_context=execution_context, unit_testing=True, run_state=RunState(), trade_settle_wait=datetime.timedelta(seconds=1), ) loop = ExecutionLoop( name="simulated_execution", command_queue=queue.Queue(), execution_context=execution_context, execution_model=execution_model, sync_model=sync_model, pricing_model_factory=pricing_model_factory, valuation_model_factory=valuation_model_factory, strategy_factory=None, store=store, cycle_duration=cycle_duration, client=None, approval_model=UncheckedApprovalModel(), stats_refresh_frequency=datetime.timedelta(0), position_trigger_check_frequency=datetime.timedelta(0), ) loop.init_simulation(universe_model, runner) return loop