TradeFeed#

tradingstrategy.direct_feed.trade_feed.TradeFeed Python class in Trading Strategy framework.

class TradeFeed[source]#

Bases: object

Real-time blockchain trade feed.

In-memory latency optimised OHLCv producer for on-chain trades.

  • Keep events in RAM

  • Generate candles based on events

  • Gracefully handle chain reorganisations and drop stale data

__init__(pairs, oracles, reorg_mon, timeframe=<Timeframe freq:1min offset:0 days 00:00:00>, data_retention_time=None, save_hook=None)[source]#

Create new real-time OHLCV tracker.

Parameters:
  • pairs (List[str]) – List of pool addresses, or list of similar identifiers.

  • oracles (Dict[str, BasePriceOracle]) –

    Reference prices for converting ETH or other crypto quoted prices to US dollars.

    In the form of pair -> Price oracle maps.

  • reorg_mon (ReorganisationMonitor) – Reorganisation detector and last good block state for a chain.

  • timeframe (Timeframe) –

    Expected timeframe of the data. Any trade deltas are snapped to the exact timeframe, so that when candle data gets updated, we always update only full candles and never return trade data that would break the timeframe in a middle.

    Default to one minute candles.

  • data_retention_time (Optional[Timedelta]) – Discard entries older than this to avoid filling the RAM.

  • save_hook (Optional[SaveHook]) – Sync the downloaded data to disk.

Methods

__init__(pairs, oracles, reorg_mon[, ...])

Create new real-time OHLCV tracker.

add_trades(trades[, start_block, end_block])

Add trade to the ring buffer with support for fixing chain reorganisations.

backfill_buffer(block_count[, tqdm, save_hook])

Populate the backbuffer before starting real-time tracker.

check_correct_block_range(df, start_block, ...)

Check that trades in the given DataFrame are for correct block range.

check_current_trades_for_duplicates()

Check for duplicate trades.

check_duplicates_data_frame(df)

Check data for duplicate trades.

check_enough_history(required_duration[, ...])

Check that the dafa we have is good time window wise.

check_reorganisations_and_purge()

Check if any of block data has changed

fetch_trades(start_block, end_block[, tqdm])

Read data from the chain.

find_first_included_block_in_candle(ts)

Find the first block number that contains data going into the candle.

get_block_number_of_last_trade()

Get the last block number for which we have good data.

get_exchange_rate(pair)

Get the current exchange rate for the pair.

get_latest_price(pair)

Return the latest price of a pair.

get_latest_trades(n[, pair])

Returns the latest trades.

get_trade_count()

How many trades we track currently.

perform_duty_cycle([verbose])

Update the candle data

restore(df)

Restore data from the previous disk save.

to_pandas(partition_size)

truncate_reorganised_data(latest_good_block)

Discard data because of the chain reorg.

update_cycle(start_block, end_block, ...)

Update the internal work buffer.

__init__(pairs, oracles, reorg_mon, timeframe=<Timeframe freq:1min offset:0 days 00:00:00>, data_retention_time=None, save_hook=None)[source]#

Create new real-time OHLCV tracker.

Parameters:
  • pairs (List[str]) – List of pool addresses, or list of similar identifiers.

  • oracles (Dict[str, BasePriceOracle]) –

    Reference prices for converting ETH or other crypto quoted prices to US dollars.

    In the form of pair -> Price oracle maps.

  • reorg_mon (ReorganisationMonitor) – Reorganisation detector and last good block state for a chain.

  • timeframe (Timeframe) –

    Expected timeframe of the data. Any trade deltas are snapped to the exact timeframe, so that when candle data gets updated, we always update only full candles and never return trade data that would break the timeframe in a middle.

    Default to one minute candles.

  • data_retention_time (Optional[Timedelta]) – Discard entries older than this to avoid filling the RAM.

  • save_hook (Optional[SaveHook]) – Sync the downloaded data to disk.

get_block_number_of_last_trade()[source]#

Get the last block number for which we have good data.

Return type:

Optional[int]

get_trade_count()[source]#

How many trades we track currently.

Return type:

int

add_trades(trades, start_block=None, end_block=None)[source]#

Add trade to the ring buffer with support for fixing chain reorganisations.

Transactions may hop between different blocks when the chain tip reorganises, getting a new timestamp. In this case, we update the

Note

It is safe to call this function multiple times for the same event.

Parameters:
  • start_block (Optional[int]) –

    Expected block range. Inclusive.

    Used for debug assets

  • end_block (Optional[int]) –

    Expected block range. Inclusive.

    Used for debug assets

  • trades (Iterable[Trade]) –

Returns:

DataFrame of new trades

Raises:

ChainReorganisationDetected – If we have detected a block reorganisation during importing the data

Return type:

DataFrame

get_latest_trades(n, pair=None)[source]#

Returns the latest trades.

These trades will be across all trading pairs we are monitoring.

Parameters:
  • n (int) – Number of trades to return

  • pair (Optional[str]) – Optional pair to filter with

Returns:

DataFrame containing n trades.

See Trade for column descriptions.

Return empty DataFrame if no trades.

Return type:

DataFrame

get_latest_price(pair)[source]#

Return the latest price of a pair.

Return the last price record we have.

Parameters:

pair (str) –

Return type:

Decimal

truncate_reorganised_data(latest_good_block)[source]#

Discard data because of the chain reorg.

Parameters:

latest_good_block – The last block that we cannot discard.

check_current_trades_for_duplicates()[source]#

Check for duplicate trades.

Internal sanity check - should not happen.

Dump debug output to error logger if happens.

Raises:

AssertionError – Should not happen

check_enough_history(required_duration, now_=None, tolerance=1.0)[source]#

Check that the dafa we have is good time window wise.

Internal sanity check - should not happen.

Dump debug output to error logger if happens.

Parameters:
  • required_duration (Timedelta) – How far back we need to have data in our buffer.

  • now – UTC timestamp what’s the current time

  • tolerance

    How much tolerance we need.

    Default to 100%, no forgiveness for any lack of data.

    There are several reasons for data mismatch, notably being that we estimate blockchain timepoints using block ranges with expected block time which is not always stable.

  • now_ (Optional[Timestamp]) –

Raises:

AssertionError – If we do not have old enough data

static check_duplicates_data_frame(df)[source]#

Check data for duplicate trades.

  • Bugs in the event reader system may cause duplicate trades

  • All trades are uniquely identified by tx_hash and log_index

  • In a perfectly working system duplicate trades do not happen

Parameters:

df (DataFrame) – Input trades

Raises:

AssertionError – Should not happen

static check_correct_block_range(df, start_block, end_block)[source]#

Check that trades in the given DataFrame are for correct block range.

  • Bugs in the event reader system may cause duplicate trades

  • All trades are uniquely identified by tx_hash and log_index

  • In a perfectly working system duplicate trades do not happen

Parameters:
  • df (DataFrame) – Input trades

  • start_block (int) – First block we expect tradse to have. Inclusive.

  • end_block (int) – Last block we expect tradse to have. Inclusive.

Raises:

AssertionError – Should not happen

check_reorganisations_and_purge()[source]#

Check if any of block data has changed

Returns:

Start block since we need to read new data (inclusive).

Return type:

ChainReorganisationResolution

backfill_buffer(block_count, tqdm=None, save_hook=None)[source]#

Populate the backbuffer before starting real-time tracker.

Parameters:
  • block_count (int) – Number of blocks we need to fetch

  • tqdm (Optional[Type[tqdm]]) –

    A progress visualiser.

    Especially useful during the initial fetch, to show the user how long it takes time to fill the buffer.

    Must be tqdm context manager compatible.

Returns:

Data loaded and filled to the work buffer.

Return type:

TradeDelta

get_exchange_rate(pair)[source]#

Get the current exchange rate for the pair.

  • There is no block number parameter; We always assume the data is filled in order, block by block, so the exchange rate is always the exchange rate of the current block.

  • Price oracles are updates per-block before trade feed for pairs, so price orocles can calculate TWAP or similar exchange rate for the block.

Parameters:

pair (str) –

Return type:

Decimal

find_first_included_block_in_candle(ts)[source]#

Find the first block number that contains data going into the candle.

Parameters:

ts (Timestamp) – Timestamp when the block can start (inclusive)

Returns:

Block number.

If there is no data, return None.

Return type:

Optional[int]

update_cycle(start_block, end_block, reorg_detected, trades)[source]#

Update the internal work buffer.

  • Adds the new trades to the work buffer

  • Updates the cycle number

  • Creates the snapshot of the new trades for the client

Parameters:
  • start_block – Incoming new data, first block (inclusive)

  • end_block – Incoming new data, last block (inclusive)

  • reorg_detected – Did we detect any chain reorganisations in this cycle

  • trades (Iterable[Trade]) – Iterable o new trades

Returns:

Delta of new trades.

Note that there might not be data for blocks towards end_block if there were no trades.

Return type:

TradeDelta

perform_duty_cycle(verbose=False)[source]#

Update the candle data

  1. Check for block reorganisations

  2. Read new data

  3. Process and index data to candles

Parameters:

verbose – More debug logging

Return type:

TradeDelta

restore(df)[source]#

Restore data from the previous disk save.

Parameters:

df (DataFrame) –

abstract fetch_trades(start_block, end_block, tqdm=None)[source]#

Read data from the chain.

Add any new trades using add_trades().

Parameters:
  • start_block (int) – Start reading from this block (inclusive)

  • end_block (Optional[int]) – End at this block (inclusive)

  • tqdm (Optional[Type[tqdm]]) – Optional progress bar displayer gadget.

Raises:

ChainReorganisationDetected – If blockchain detects minor reorganisation during the data ignestion

Returns:

TradeDelta instance that contains all new trades since start_block.

Return type:

Iterable[Trade]