"""Liquidity data feed manipulation.
For more information about liquidity in automatic market making pools see :term:`AMM`
and :term:`XY liquidity model`.
"""
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict, cast
import pandas as pd
import pyarrow as pa
from dataclasses_json import dataclass_json
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.types import UNIXTimestamp, USDollarAmount, BlockNumber, PrimaryKey
from tradingstrategy.utils.groupeduniverse import PairGroupedUniverse
# Preconstructed pd.Tiemdelta for optimisation
_ZERO_TIMEDELTA = pd.Timedelta(0)
class LiquidityDataUnavailable(Exception):
"""We tried to look up liquidity info for a trading pair, but count not find a sample."""
[docs]@dataclass_json
@dataclass
class XYLiquidity:
"""Data structure that presents TVL/liquidity status in a DEX pool.
.. note ::
The class name is misleading, as nowadays this class presents
both :term:`XY Liquidity` and :term:`TVL` in :term:`CLMM` DEX.
This data structure is for naive x*y=k :term:`AMM` pool.
Liquidity is not the part of the normal :term:`technical analysis`,
so the dataset server has separate datasets for it.
Liquidity is expressed as US dollar value of the :term:`quote token` of the pool.
For example if the pool is 50 $FOO in reserve0 and 50 $USDC in reserve1, the
liquidity of the pool would be expressed as 50 USD.
Liquidity events, like :term:`candles <candle>`, have open, high, low and close values,
depending on which time of the candle they were sampled.
"""
#: Primary key to identity the trading pair
#: Use pair universe to map this to chain id and a smart contract address
pair_id: PrimaryKey
#: Open timestamp for this time bucket.
timestamp: UNIXTimestamp
#: USD exchange rate of the quote token used to
#: convert to dollar amounts in this time bucket.
#:
#: Note that currently any USD stablecoin (USDC, DAI) is
#: assumed to be 1:1 and the candle server cannot
#: handle exchange rate difference among stablecoins.
#:
#: The rate is taken at the beginning of the 1 minute time bucket.
#: For other time buckets, the exchange rate is the simple average
#: for the duration of the bucket.
exchange_rate: float
#: Liquidity absolute values in the pool in different time points.
#: Note - for minute candles - if the candle contains only one event (mint, burn, sync)
#: the open liquidity value is the value AFTER this event.
#: The dataset server does not track the closing value of the previous liquidity event.
#: This applies for minute candles only.
open: USDollarAmount
#: Liquidity absolute values in the pool in different time points
close: USDollarAmount
#: Liquidity absolute values in the pool in different time points
high: USDollarAmount
#: Liquidity absolute values in the pool in different time points
low: USDollarAmount
#: Number of liquidity supplied events for pool
adds: int
#: Number of liquidity removed events for the pool
removes: int
#: Number of total events affecting liquidity during the time window.
#: This is adds, removes AND swaps AND sync().
syncs: int
#: How much new liquidity was supplied, in the terms of the quote token converted to US dollar
add_volume: USDollarAmount
#: How much new liquidity was removed, in the terms of the quote token converted to US dollar
add_volume: USDollarAmount
#: Blockchain tracking information
start_block: BlockNumber
#: Blockchain tracking information
end_block: BlockNumber
def __repr__(self):
human_timestamp = naive_utcfromtimestamp(self.timestamp)
return f"@{human_timestamp} O:{self.open} H:{self.high} L:{self.low} C:{self.close} V:{self.volume} A:{self.adds} R:{self.removes} SB:{self.start_block} EB:{self.end_block}"
[docs] @classmethod
def to_pyarrow_schema(cls, small_candles=False) -> pa.Schema:
"""Construct schema for writing Parquet filess for these candles.
:param small_candles: Use even smaller word sizes for frequent (1m) candles.
"""
schema = pa.schema([
("pair_id", pa.uint32()),
("timestamp", pa.timestamp("s")),
("exchange_rate", pa.float64()),
("open", pa.float64()),
("close", pa.float64()),
("high", pa.float64()),
("low", pa.float64()),
("adds", pa.uint16() if small_candles else pa.uint32()),
("removes", pa.uint16() if small_candles else pa.uint32()),
("syncs", pa.uint16() if small_candles else pa.uint32()),
("add_volume", pa.float64()),
("remove_volume", pa.float64()),
("start_block", pa.uint32()), # Should we good for 4B blocks
("end_block", pa.uint32()),
])
return schema
[docs] @classmethod
def to_dataframe(cls) -> pd.DataFrame:
"""Return emptry Pandas dataframe presenting liquidity sample."""
# TODO: Does not match the spec 1:1 - but only used as empty in tests
fields = dict([
("pair_id", "int"),
("timestamp", "datetime64[s]"),
("exchange_rate", "float64"),
("open", "float64"),
("close", "float64"),
("high", "float64"),
("low", "float64"),
("buys", "float64"),
("sells", "float64"),
("add_volume", "float64"),
("remove_volume", "float64"),
("start_block", "UInt32"),
("end_block", "UInt32"),
])
df = pd.DataFrame(columns=fields.keys())
return df.astype(fields)
[docs] @classmethod
def convert_web_candles_to_dataframe(cls, web_candles: list[dict]) -> pd.DataFrame:
"""Return Pandas dataframe presenting TVL data fetched from JSON endpoint.
Convert JSON data to Pandas.
Uses `/candles` endpoint data format https://tradingstrategy.ai/api/explorer/#/Trading%20pair/web_candles
"""
#: Schema definition for :py:class:`pd.DataFrame:
#:
#: Defines Pandas datatypes for columns in our candle data format.
#: Useful e.g. when we are manipulating JSON/hand-written data.
#:
WEB_DATAFRAME_FIELDS = dict([
("timestamp", "datetime64[s]"),
("open", "float"),
("close", "float"),
("high", "float"),
("low", "float"),
])
# Handle the special case the API endpoint did not return any data
if len(web_candles) == 0:
# Create an empty DataFrame with the specified columns
df = pd.DataFrame(columns=['open', 'high', 'low', 'close', 'timestamp', 'pair_id'])
# Set the index to a DateTimeIndex
df.index = pd.DatetimeIndex([], name='timestamp')
return df
df = pd.DataFrame(web_candles)
df = df.rename(columns={
"ts": "timestamp",
"o": "open",
"h": "high",
"l": "low",
"c": "close",
})
df = df.astype(WEB_DATAFRAME_FIELDS)
# Convert unix timestamps to Pandas
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Set timestamp as the index, but also retain as a column
# TODO: In the future use (pair_id, timestamp) MultiIndex for everything
df = df.set_index("timestamp", drop=False)
return df
[docs]@dataclass_json
@dataclass
class LiquidityResult:
"""Server-reply for live queried liquidity data."""
#: A bunch of candles.
#: Candles are unordered and subject to client side sorting.
#: Multiple pairs and chains may be present in candles.
liquidity_events: List[XYLiquidity]
[docs] def sort_by_timestamp(self):
"""In-place sorting of candles by their timestamp."""
self.candles.sort(key=lambda c: c.timestamp)
[docs]class GroupedLiquidityUniverse(PairGroupedUniverse):
"""A universe where each trading pair has its own liquidity data feed.
This is helper class to create foundation for multi pair strategies.
For the data logistics purposes, all candles are lumped together in single columnar data blobs.
However, it rarely makes sense to execute operations over different trading pairs.
:py:class`GroupedLiquidityUniverse` creates trading pair id -> liquidity sample data grouping out from
raw liquidity sample.
"""
[docs] def __init__(
self,
df: pd.DataFrame,
time_bucket=TimeBucket.d1,
timestamp_column="timestamp",
index_automatically=True,
forward_fill=False,
):
super().__init__(
df,
time_bucket,
timestamp_column,
index_automatically,
bad_open_close_threshold=None, # Applicable to prices only
forward_fill=forward_fill,
)
[docs] def get_liquidity_samples_by_pair(self, pair_id: PrimaryKey) -> Optional[pd.DataFrame]:
"""Get samples for a single pair.
:return:
If the pair does not exist return `None`.
"""
try:
return self.get_samples_by_pair(pair_id)
except KeyError:
return None
[docs] def get_liquidity_with_tolerance(
self,
pair_id: PrimaryKey,
when: pd.Timestamp,
tolerance: pd.Timedelta,
kind="close"
) -> Tuple[USDollarAmount, pd.Timedelta]:
"""Get the available liquidity for a trading pair at a specific time point/
The liquidity is defined as one-sided as in :term:`XY liquidity model`.
Example:
.. code-block:: python
liquidity_amount, last_trade_delay = liquidity_universe.get_liquidity_with_tolerance(
sushi_usdt.pair_id,
pd.Timestamp("2021-12-31"),
tolerance=pd.Timedelta("1y"),
)
assert liquidity_amount == pytest.approx(2292.4517)
assert last_trade_delay == pd.Timedelta('4 days 00:00:00')
:param pair_id:
Trading pair id
:param when:
Timestamp to query
:param kind:
One of OHLC data points: "open", "close", "low", "high"
:param tolerance:
If there is no liquidity sample available at the exact timepoint,
look to the past to the get the nearest sample.
For example if candle time interval is 5 minutes and look_back_timeframes is 10,
then accept a candle that is maximum of 50 minutes before the timepoint.
:return:
Return (price, delay) tuple.
We always return a price. In the error cases an exception is raised.
The delay is the timedelta between the wanted timestamp
and the actual timestamp of the candle.
Candles are always timestamped by their opening.
:raise LiquidityDataUnavailable:
There were no samples available with the given condition.
This can happen when
- There has not been a single trade yet
- There hasn't been any trades since `tolerance`
time window
"""
assert kind in ("open", "close", "high", "low"), f"Got kind: {kind}"
last_allowed_timestamp = when - tolerance
try:
candles_per_pair = self.get_samples_by_pair(pair_id)
except KeyError as e:
raise LiquidityDataUnavailable(f"Pair {pair_id} does not contain any liquidity samples at all") from e
samples_per_kind = candles_per_pair[kind]
# The indexes we can have are
# - MultiIndex (pair id, timestamp) - if multiple trading pairs present
# - DatetimeIndex - if single trading pair present
if isinstance(candles_per_pair.index, pd.MultiIndex):
timestamp_index = cast(pd.DatetimeIndex, candles_per_pair.index.get_level_values(1))
elif isinstance(candles_per_pair.index, pd.DatetimeIndex):
timestamp_index = candles_per_pair.index
else:
raise NotImplementedError(f"Does not know how to handle index {candles_per_pair.index}")
# https://pandas.pydata.org/docs/reference/api/pandas.Index.get_indexer.html
# https://stackoverflow.com/questions/71027193/datetimeindex-get-loc-is-deprecated
indexer = timestamp_index.get_indexer([when], method="pad")
if indexer[0] != -1:
discrete_ts_idx = indexer[0]
else:
first_sample = candles_per_pair.index[0]
raise LiquidityDataUnavailable(f"Pair {pair_id} does not have liquidity/TVL data older data than {first_sample}. Tried to look up {when}.")
sample_timestamp = timestamp_index[discrete_ts_idx]
latest_value = candles_per_pair.iloc[discrete_ts_idx][kind]
distance = when - sample_timestamp
assert distance >= _ZERO_TIMEDELTA, f"Somehow we managed to get a timestamp {sample_timestamp} that is newer than asked {when}"
if sample_timestamp >= last_allowed_timestamp:
# Return the chosen price column of the sample
return latest_value, distance
# Try to be helpful with the errors here,
# so one does not need to open ipdb to inspect faulty data
try:
first_sample = candles_per_pair.iloc[0]
second_sample = candles_per_pair.iloc[1]
last_sample = candles_per_pair.iloc[-1]
except KeyError:
raise LiquidityDataUnavailable(
f"Could not find any liquidity samples for pair {pair_id}, value kind '{kind}', between {when} - {last_allowed_timestamp}\n"
f"Could not figure out existing data range. Has {len(samples_per_kind)} samples."
)
raise LiquidityDataUnavailable(
f"Could not find any liquidity samples for pair {pair_id}, value kind '{kind}', between {when} - {last_allowed_timestamp}\n"
f"The pair has {len(samples_per_kind)} candles between {first_sample['timestamp']} - {last_sample['timestamp']}\n"
f"Sample interval is {second_sample['timestamp'] - first_sample['timestamp']}\n"
f"\n"
f"This is usually due to sparse candle data - trades have not been made or the blockchain was halted during the price look-up period.\n"
f"Try to increase look back perid in your code."
)
[docs] def get_closest_liquidity(self, pair_id: PrimaryKey, when: pd.Timestamp, kind="open", look_back_time_frames=5) -> USDollarAmount:
"""Get the available liuqidity for a trading pair at a specific timepoint or some candles before the timepoint.
The liquidity is defined as one-sided as in :term:`XY liquidity model`.
.. warning::
This is an early alpha method and has been deprecated.
Please use :py:meth:`get_liquidity_with_tolerance` instead.
:param pair_id: Traing pair id
:param when: Timestamp to query
:param kind: One of liquidity samples: "open", "close", "low", "high"
:param look_back_timeframes: If there is no liquidity sample available at the exact timepoint,
look to the past to the get the nearest sample
:return: We always return
:raise LiquidityDataUnavailable: There was no liquidity sample available
"""
assert kind in ("open", "close", "high", "low"), f"Got kind: {kind}"
start_when = when
samples_per_pair = self.get_liquidity_samples_by_pair(pair_id)
assert samples_per_pair is not None, f"No liquidity data available for pair {pair_id}"
samples_per_kind = samples_per_pair[kind]
for attempt in range(look_back_time_frames):
try:
sample = samples_per_kind[when]
return sample
except KeyError:
# Go to the previous sample
when -= self.time_bucket.to_timedelta()
raise LiquidityDataUnavailable(f"Could not find any liquidity samples for pair {pair_id} between {when} - {start_when}")
[docs] @staticmethod
def create_empty() -> "GroupedLiquidityUniverse":
"""Create a liquidity universe without any data."""
return GroupedLiquidityUniverse(df=XYLiquidity.to_dataframe(), index_automatically=False)
[docs]class ResampledLiquidityUniverse:
"""Backtesting performance optimised version of liquidity universe.
- The class is designed to be used with strategies that trade
a large number of pairs and picks pairs by their liquidity criteria,
which varies over time
- Because liquidity information does not need to be as accurate as pricing information,
and a rough estimate of liquidity is enough for most strategies, we can preconstruct
an optimised version of liquidity universe
- This version of liquidity universe is designed for the maximum speed of
:py:meth:`get_liquidity_fast` with (pair id, timestamp) parameter
- Some of the speed is achieved by precomputing upsampling liquidity data
to a longer frequency (day, week)
- Some of the speed is achieved by using extra indexes and memory
- Some of the speed is achieved by forward filling any data gaps,
so we do not need to worry about the sparse data
The speedup difference between :py:meth`LiquidityUniverse.get_liquidity_with_tolerance`
and py:meth:`get_liquidity_fast` is more than 10x.
The practice speed increase for multipair liquidity aware strategy backtest can be
2x - 3x.
.. note ::
The resampling itself will take some time, so optimally
the resampled data should be stored on-disk cache between different backtest runs
"""
[docs] def __init__(self,
df: pd.DataFrame,
column="close",
resample_period="1D",
resample_method="min",
):
"""Calculate optimised liquidity universe.
- Only extract the column we are interested in
- Resample to a smaller data
- Resample by the
.. note::
For a large number of pairs, resampling will take a long time.
You should remove any unnecessary pairs in `df` before creating a resampled version.
:param df:
Liquidity dataframe.
You should prepare this by dropping any unwanted pairs.
:param column:
Which column value we use for the resampled series.
Defaults to (daily) close liquidity value.
:param resample_period:
Pandas frequency alias string for the new timeframe duration.
E.g. `1D` for daily.
Must be a fixed frequency e.g. `30D` instead of `M`.
See https://stackoverflow.com/a/35339226/315168
:param resample_method:
How to we resample the liquidity
"""
new_df = df[["timestamp", "pair_id"]].copy()
new_df["value"] = df[[column]]
new_df.set_index(new_df["timestamp"], inplace=True, drop=True)
grouped_df = new_df.groupby("pair_id")
# https://pandas.pydata.org/docs/reference/api/pandas.core.groupby.DataFrameGroupBy.resample.html
#
# Important to set origin="epoch" here, or otherwise
# pd.Timestamp().floor() does not match our range values in get_liquidity_fast()
#
resampled_df = grouped_df.resample(resample_period, origin="epoch").agg({"value": resample_method}).ffill()
self.resample_period = resample_period
self.df: pd.DataFrame = resampled_df
self.pair_cache: Dict[PrimaryKey, pd.DataFrame] = {}
[docs] def get_samples_by_pair(self, pair_id: int) -> pd.DataFrame:
"""Access and cache the resampled liquidity data per pair."""
# https://stackoverflow.com/a/45563615/315168
if pair_id not in self.pair_cache:
try:
self.pair_cache[pair_id] = self.df.xs(pair_id)
except KeyError as e:
raise KeyError(f"Could not find pair for pair_id {pair_id}") from e
return self.pair_cache[pair_id]
[docs] def get_liquidity_fast(self,
pair_id: PrimaryKey,
when: pd.Timestamp) -> USDollarAmount:
"""Get the available liquidity for a trading pair at a specific time point.
The liquidity is read from an upsampled buffer.
The lookup is fast because any timestamp is rounded down to the nearest frequency full timestamp
and looked up there.
:param pair_id:
Trading pair id
:param when:
Timestamp to query
:return:
Return available liquidty or `0` if no liquidity was seen before the timestamp.
"""
assert isinstance(when, pd.Timestamp)
try:
samples = self.get_samples_by_pair(pair_id)
except KeyError as e:
raise LiquidityDataUnavailable(f"No liquidity data for {pair_id}") from e
try:
rounded_ts = when.floor(self.resample_period)
return samples.loc[rounded_ts]["value"]
except KeyError:
return 0.0