TradeFeed#
tradingstrategy.direct_feed.trade_feed.TradeFeed Python class in Trading Strategy framework.
- class TradeFeed[source]#
Bases:
object
Real-time blockchain trade feed.
In-memory latency optimised OHLCv producer for on-chain trades.
Keep events in RAM
Generate candles based on events
Gracefully handle chain reorganisations and drop stale data
- __init__(pairs, oracles, reorg_mon, timeframe=<Timeframe freq:1min offset:0 days 00:00:00>, data_retention_time=None, save_hook=None)[source]#
Create new real-time OHLCV tracker.
- Parameters:
pairs (List[str]) – List of pool addresses, or list of similar identifiers.
oracles (Dict[str, BasePriceOracle]) –
Reference prices for converting ETH or other crypto quoted prices to US dollars.
In the form of pair -> Price oracle maps.
reorg_mon (ReorganisationMonitor) – Reorganisation detector and last good block state for a chain.
timeframe (Timeframe) –
Expected timeframe of the data. Any trade deltas are snapped to the exact timeframe, so that when candle data gets updated, we always update only full candles and never return trade data that would break the timeframe in a middle.
Default to one minute candles.
data_retention_time (Optional[Timedelta]) – Discard entries older than this to avoid filling the RAM.
save_hook (Optional[SaveHook]) – Sync the downloaded data to disk.
Methods
__init__
(pairs, oracles, reorg_mon[, ...])Create new real-time OHLCV tracker.
add_trades
(trades[, start_block, end_block])Add trade to the ring buffer with support for fixing chain reorganisations.
backfill_buffer
(block_count[, tqdm, save_hook])Populate the backbuffer before starting real-time tracker.
check_correct_block_range
(df, start_block, ...)Check that trades in the given DataFrame are for correct block range.
Check for duplicate trades.
Check data for duplicate trades.
check_enough_history
(required_duration[, ...])Check that the dafa we have is good time window wise.
Check if any of block data has changed
fetch_trades
(start_block, end_block[, tqdm])Read data from the chain.
Find the first block number that contains data going into the candle.
Get the last block number for which we have good data.
get_exchange_rate
(pair)Get the current exchange rate for the pair.
get_latest_price
(pair)Return the latest price of a pair.
get_latest_trades
(n[, pair])Returns the latest trades.
How many trades we track currently.
perform_duty_cycle
([verbose])Update the candle data
restore
(df)Restore data from the previous disk save.
to_pandas
(partition_size)truncate_reorganised_data
(latest_good_block)Discard data because of the chain reorg.
update_cycle
(start_block, end_block, ...)Update the internal work buffer.
- __init__(pairs, oracles, reorg_mon, timeframe=<Timeframe freq:1min offset:0 days 00:00:00>, data_retention_time=None, save_hook=None)[source]#
Create new real-time OHLCV tracker.
- Parameters:
pairs (List[str]) – List of pool addresses, or list of similar identifiers.
oracles (Dict[str, BasePriceOracle]) –
Reference prices for converting ETH or other crypto quoted prices to US dollars.
In the form of pair -> Price oracle maps.
reorg_mon (ReorganisationMonitor) – Reorganisation detector and last good block state for a chain.
timeframe (Timeframe) –
Expected timeframe of the data. Any trade deltas are snapped to the exact timeframe, so that when candle data gets updated, we always update only full candles and never return trade data that would break the timeframe in a middle.
Default to one minute candles.
data_retention_time (Optional[Timedelta]) – Discard entries older than this to avoid filling the RAM.
save_hook (Optional[SaveHook]) – Sync the downloaded data to disk.
- add_trades(trades, start_block=None, end_block=None)[source]#
Add trade to the ring buffer with support for fixing chain reorganisations.
Transactions may hop between different blocks when the chain tip reorganises, getting a new timestamp. In this case, we update the
Note
It is safe to call this function multiple times for the same event.
- Parameters:
- Returns:
DataFrame of new trades
- Raises:
ChainReorganisationDetected – If we have detected a block reorganisation during importing the data
- Return type:
- get_latest_trades(n, pair=None)[source]#
Returns the latest trades.
These trades will be across all trading pairs we are monitoring.
- get_latest_price(pair)[source]#
Return the latest price of a pair.
Return the last price record we have.
- truncate_reorganised_data(latest_good_block)[source]#
Discard data because of the chain reorg.
- Parameters:
latest_good_block – The last block that we cannot discard.
- check_current_trades_for_duplicates()[source]#
Check for duplicate trades.
Internal sanity check - should not happen.
Dump debug output to error logger if happens.
- Raises:
AssertionError – Should not happen
- check_enough_history(required_duration, now_=None, tolerance=1.0)[source]#
Check that the dafa we have is good time window wise.
Internal sanity check - should not happen.
Dump debug output to error logger if happens.
- Parameters:
required_duration (Timedelta) – How far back we need to have data in our buffer.
now – UTC timestamp what’s the current time
tolerance –
How much tolerance we need.
Default to 100%, no forgiveness for any lack of data.
There are several reasons for data mismatch, notably being that we estimate blockchain timepoints using block ranges with expected block time which is not always stable.
- Raises:
AssertionError – If we do not have old enough data
- static check_duplicates_data_frame(df)[source]#
Check data for duplicate trades.
Bugs in the event reader system may cause duplicate trades
All trades are uniquely identified by tx_hash and log_index
In a perfectly working system duplicate trades do not happen
- Parameters:
df (DataFrame) – Input trades
- Raises:
AssertionError – Should not happen
- static check_correct_block_range(df, start_block, end_block)[source]#
Check that trades in the given DataFrame are for correct block range.
Bugs in the event reader system may cause duplicate trades
All trades are uniquely identified by tx_hash and log_index
In a perfectly working system duplicate trades do not happen
- Parameters:
- Raises:
AssertionError – Should not happen
- check_reorganisations_and_purge()[source]#
Check if any of block data has changed
- Returns:
Start block since we need to read new data (inclusive).
- Return type:
- backfill_buffer(block_count, tqdm=None, save_hook=None)[source]#
Populate the backbuffer before starting real-time tracker.
- Parameters:
- Returns:
Data loaded and filled to the work buffer.
- Return type:
- get_exchange_rate(pair)[source]#
Get the current exchange rate for the pair.
There is no block number parameter; We always assume the data is filled in order, block by block, so the exchange rate is always the exchange rate of the current block.
Price oracles are updates per-block before trade feed for pairs, so price orocles can calculate TWAP or similar exchange rate for the block.
- find_first_included_block_in_candle(ts)[source]#
Find the first block number that contains data going into the candle.
- update_cycle(start_block, end_block, reorg_detected, trades)[source]#
Update the internal work buffer.
Adds the new trades to the work buffer
Updates the cycle number
Creates the snapshot of the new trades for the client
- Parameters:
- Returns:
Delta of new trades.
Note that there might not be data for blocks towards end_block if there were no trades.
- Return type:
- perform_duty_cycle(verbose=False)[source]#
Update the candle data
Check for block reorganisations
Read new data
Process and index data to candles
- Parameters:
verbose – More debug logging
- Return type:
- abstract fetch_trades(start_block, end_block, tqdm=None)[source]#
Read data from the chain.
Add any new trades using
add_trades()
.- Parameters:
- Raises:
ChainReorganisationDetected – If blockchain detects minor reorganisation during the data ignestion
- Returns:
TradeDelta instance that contains all new trades since start_block.
- Return type: