Source code for tradingstrategy.utils.liquidity_filter

"""Filtering Uniswap trading pairs for liquidity.

- Used to build a trading pair universe with tradeable pairs of enough liquidity, no survivorship bias

- See :py:func:`build_liquidity_summary` for usage

"""
from collections import Counter
from typing import Collection

import pandas as pd
from pandas.core.groupby import DataFrameGroupBy

from tradingstrategy.types import USDollarAmount, PrimaryKey
from tradingstrategy.utils.time import floor_pandas_week


[docs]def get_somewhat_realistic_max_liquidity( liquidity_df, pair_id, samples=10, broken_liquidity=100_000_000, ) -> float: """Get the max liquidity of a trading pair over its history. - Get the token by its maximum ever liquidity, so we avoid survivorship bias - Instead of picking the absolute top, we pick n top samples and choose lowest of those - This allows us to avoid data sampling issues when the liquidity value, as calculated with the function of price, might have been weird when the token launched :param broken_liquidity: Cannot have more than 100M USD """ try: liquidity_samples = liquidity_df.obj.loc[pair_id]["close"].nlargest(samples) sample = min(liquidity_samples) if sample > broken_liquidity: # Filter out bad data return -1 return sample except KeyError: # Pair not available, because liquidity data is not there, or zero, or broken return -1
[docs]def get_liquidity_today( liquidity_df, pair_id, delay=pd.Timedelta(days=30) ) -> float: """Get the current liquidity of a trading pair :param delay: Look back X days. To avoid indexer delays. :return: US dollars """ try: timestamp = floor_pandas_week(pd.Timestamp.now() - delay) sample = liquidity_df.obj.loc[pair_id]["close"][timestamp] return sample except KeyError: # Pair not available, because liquidity data is not there, or zero, or broken return -1
[docs]def build_liquidity_summary( liquidity_df: pd.DataFrame | DataFrameGroupBy, pair_ids: Collection[PrimaryKey] | pd.Series, delay=pd.Timedelta(days=21) ) -> tuple[Counter[PrimaryKey, USDollarAmount], Counter[PrimaryKey, USDollarAmount]]: """Build a liquidity status of the trading pairs - Get the historical max liquidity of a pair, so we can use this for filtering without survivorship bias - Get the most recent liquidity (w/delay of few days) Example: .. code-block:: python chain_id = ChainId.ethereum time_bucket = TimeBucket.d1 # OHCLV data frequency liquidity_time_bucket = TimeBucket.d1 # TVL data for Uniswap v3 is only sampled daily, more fine granular is not needed exchange_slugs = {"uniswap-v3", "uniswap-v2", "sushi"} exported_top_pair_count = 100 liquidity_comparison_date = floor_pandas_week(pd.Timestamp.now() - pd.Timedelta(days=7)) # What date we use to select top 100 liquid pairs tokensniffer_threshold = 24 # We want our TokenSniffer score to be higher than this for base tokens min_liquidity_threshold = 4_000_000 # Prefilter pairs with this liquidity before calling token sniffer allowed_pairs_for_token_sniffer = 150 # How many pairs we let to go through TokenSniffer filtering process (even if still above min_liquidity_threshold) # # Set up output files - use Trading Strategy client's cache folder # client = Client.create_jupyter_client() cache_path = client.transport.cache_path # # Download - process - save # print("Downloading/opening exchange dataset") exchange_universe = client.fetch_exchange_universe() # Resolve uniswap-v3 internal id exchanges = [exchange_universe.get_by_chain_and_slug(chain_id, exchange_slug) for exchange_slug in exchange_slugs] exchange_ids = [exchange.exchange_id for exchange in exchanges] print(f"Exchange {exchange_slugs} ids are {exchange_ids}") # We need pair metadata to know which pairs belong to Polygon print("Downloading/opening pairs dataset") pairs_df = client.fetch_pair_universe().to_pandas() our_chain_pairs = filter_pairs_default( pairs_df, chain_id=chain_id, exchange_ids=exchange_ids, ) our_chain_pair_ids = our_chain_pairs["pair_id"] print(f"We have data for {len(our_chain_pair_ids)} trading pairs on {fname} set") print("Building pair metadata map") pairs_df = pairs_df.set_index("pair_id") pair_metadata = {pair_id: row for pair_id, row in pairs_df.iterrows()} uni_v3_pair_metadata = {pair_id: row for pair_id, row in pairs_df.iterrows() if row["exchange_slug"] == "uniswap-v3"} print(f"From this, Uniswap v3 data has {len(uni_v3_pair_metadata)} pairs") # Download all liquidity data, extract # trading pairs that exceed our prefiltering threshold print(f"Downloading/opening TVL/liquidity dataset {liquidity_time_bucket}") liquidity_df = client.fetch_all_liquidity_samples(liquidity_time_bucket).to_pandas() print(f"Setting up per-pair liquidity filtering, raw liquidity data os {len(liquidity_df)} entries") liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)] liquidity_df = liquidity_df.set_index("timestamp").groupby("pair_id") print(f"Forward-filling liquidity, before forward-fill the size is {len(liquidity_df)} samples, target frequency is {liquidity_time_bucket.to_frequency()}") liquidity_df = forward_fill(liquidity_df, liquidity_time_bucket.to_frequency(), columns=("close",)) # Only daily close liq needed for analysis, don't bother resample other cols # Get top liquidity for all of our pairs print(f"Filtering out historical liquidity of pairs") pair_liquidity_max_historical, pair_liquidity_today = build_liquidity_summary(liquidity_df, our_chain_pair_ids) print(f"Chain {chain_id.name} has liquidity data for {len(pair_liquidity_max_historical)} pairs at {liquidity_comparison_date}") # Check how many pairs did not have good values for liquidity broken_pairs = {pair_id for pair_id, liquidity in pair_liquidity_max_historical.items() if liquidity < 0} print(f"Liquidity data is broken for {len(broken_pairs)} trading pairs") uniswap_v3_liquidity_pairs = {pair_id for pair_id in pair_liquidity_max_historical.keys() if pair_id in uni_v3_pair_metadata} print(f"From this, Uniswap v3 is {len(uniswap_v3_liquidity_pairs)} pairs") assert len(uniswap_v3_liquidity_pairs) > 0, "No Uniswap v3 liquidity detected" # Remove duplicate pairs print("Prefiltering and removing duplicate pairs") top_liquid_pairs_filtered = Counter() for pair_id, liquidity in pair_liquidity_max_historical.most_common(): ticker = make_simple_ticker(pair_metadata[pair_id]) if liquidity < min_liquidity_threshold: # Prefilter pairs continue if ticker in top_liquid_pairs_filtered: # This pair is already in the dataset under a different pool # with more liquidity continue top_liquid_pairs_filtered[pair_id] = liquidity print(f"After prefilter, we have {len(top_liquid_pairs_filtered):,} pairs left") :param liquidity_df: Liquidity data. **MUST BE forward filled for no gaps and timestamp indexed.** Must be daily/weekly timeframe to include TVL data and match our lookup functions. :param pair_ids: Pairs we are interested in :param delay: The time lag to check the "current" today's liquidity. Ensure the data is indexed by the time we run this code. :return: Two counters of historical max liquidity, liquidity today. All in USD. Pair liquidity value is set to `-1` if the lookup failed (data not available, data contains inrealistic values, etc.) """ assert isinstance(liquidity_df, (pd.DataFrame, DataFrameGroupBy)) assert isinstance(pair_ids, (pd.Series, list, set, tuple)) assert len(pair_ids) > 0 if not isinstance(liquidity_df, DataFrameGroupBy): # TODO: This is unlikely to work but let's try be helpful anyway liquidity_df = liquidity_df.set_index("timestamp").groupby("pair_id") # Get top liquidity for all of our pairs pair_liquidity_max_historical = Counter() pair_liquidity_today = Counter() for pair_id in pair_ids: pair_liquidity_max_historical[pair_id] = get_somewhat_realistic_max_liquidity(liquidity_df, pair_id) pair_liquidity_today[pair_id] = get_liquidity_today(liquidity_df, pair_id, delay=delay) return pair_liquidity_max_historical, pair_liquidity_today