Source code for tradingstrategy.direct_feed.synthetic_feed

"""Synthetic random price data feed for unit testing."""

import random
from decimal import Decimal
from typing import Dict, Optional, List, Iterable, Type

import pandas as pd
from tqdm_loggable.auto import tqdm

from eth_defi.event_reader.block_header import BlockHeader
from eth_defi.price_oracle.oracle import BasePriceOracle
from eth_defi.event_reader.reorganisation_monitor import ReorganisationMonitor

from .timeframe import Timeframe
from .trade_feed import TradeFeed, Trade
from .direct_feed_pair import PairId


[docs]class SyntheticTradeFeed(TradeFeed): """Synthetic trade feed that produces random trades."""
[docs] def __init__(self, pairs: List[PairId], oracles: Dict[PairId, BasePriceOracle], reorg_mon: ReorganisationMonitor, data_retention_time: Optional[pd.Timedelta] = None, random_seed = 1, start_price_range=150, end_price_range=300, min_trades_per_block=0, max_trades_per_block=10, min_amount=50, max_amount=50, price_movement_per_trade=2.5, timeframe: Timeframe = Timeframe("1min"), prices: Dict[PairId, float] = None, broken_wick_block_frequency=0, ): super().__init__( pairs=pairs, oracles=oracles, reorg_mon=reorg_mon, data_retention_time=data_retention_time, timeframe=timeframe, ) self.pairs = pairs self.min_trades_per_block = min_trades_per_block self.max_trades_per_block = max_trades_per_block self.min_amount = min_amount self.max_amount = max_amount self.price_movement_per_trade = price_movement_per_trade self.broken_wick_block_frequency = broken_wick_block_frequency self.random_gen = random.Random(random_seed) if not prices: self.prices = { p: self.random_gen.randint(start_price_range, end_price_range) for p in pairs } else: self.prices = prices
[docs] def fetch_trades(self, start_block: int, end_block: Optional[int], tqdm: Optional[Type[tqdm]] = None) -> Iterable[Trade]: """Generate few random trades per block per pair.""" block_data = {b.block_number: b for b in self.reorg_mon.fetch_block_data(start_block, end_block)} max_blocks = end_block - start_block if max_blocks > 5 and tqdm: progress_bar = tqdm(total=max_blocks) else: progress_bar = None for block_num in range(start_block, end_block + 1): for p in self.pairs: trades_per_block = self.random_gen.randint(self.min_trades_per_block, self.max_trades_per_block) for trade_idx in range(trades_per_block): self.prices[p] += self.random_gen.uniform(-self.price_movement_per_trade, self.price_movement_per_trade) self.prices[p] = max(self.prices[p], 0.00001) # Don't go to negative prices price = self.prices[p] # Simulate broken data by inserting prices 1000x out of range # See test_filter_wick.py if self.broken_wick_block_frequency: if self.random_gen.randint(0, self.broken_wick_block_frequency) == 0: if self.random_gen.randint(0, 2) == 1: price *= 1000 else: price /= 1000 amount = self.random_gen.uniform(self.min_amount, self.max_amount) block: BlockHeader = block_data[block_num] tx_hash = hex(self.random_gen.randint(2**31, 2**32)) log_index = trade_idx exchange_rate = self.get_exchange_rate(p) yield Trade( pair=p, block_number=block_num, block_hash=block.block_hash, timestamp=pd.Timestamp.utcfromtimestamp(block.timestamp).tz_localize(None), tx_hash=tx_hash, log_index=log_index, price=Decimal(price), amount=Decimal(amount), exchange_rate=exchange_rate, ) if progress_bar: progress_bar.update(1) if progress_bar: progress_bar.close()