build_liquidity_summary#
API documentation for tradingstrategy.utils.liquidity_filter.build_liquidity_summary Python function.
- build_liquidity_summary(liquidity_df, pair_ids, delay=Timedelta('21 days 00:00:00'))[source]#
Build a liquidity status of the trading pairs
Get the historical max liquidity of a pair, so we can use this for filtering without survivorship bias
Get the most recent liquidity (w/delay of few days)
Example:
chain_id = ChainId.ethereum time_bucket = TimeBucket.d1 # OHCLV data frequency liquidity_time_bucket = TimeBucket.d1 # TVL data for Uniswap v3 is only sampled daily, more fine granular is not needed exchange_slugs = {"uniswap-v3", "uniswap-v2", "sushi"} exported_top_pair_count = 100 liquidity_comparison_date = floor_pandas_week(pd.Timestamp.now() - pd.Timedelta(days=7)) # What date we use to select top 100 liquid pairs tokensniffer_threshold = 24 # We want our TokenSniffer score to be higher than this for base tokens min_liquidity_threshold = 4_000_000 # Prefilter pairs with this liquidity before calling token sniffer allowed_pairs_for_token_sniffer = 150 # How many pairs we let to go through TokenSniffer filtering process (even if still above min_liquidity_threshold) # # Set up output files - use Trading Strategy client's cache folder # client = Client.create_jupyter_client() cache_path = client.transport.cache_path # # Download - process - save # print("Downloading/opening exchange dataset") exchange_universe = client.fetch_exchange_universe() # Resolve uniswap-v3 internal id exchanges = [exchange_universe.get_by_chain_and_slug(chain_id, exchange_slug) for exchange_slug in exchange_slugs] exchange_ids = [exchange.exchange_id for exchange in exchanges] print(f"Exchange {exchange_slugs} ids are {exchange_ids}") # We need pair metadata to know which pairs belong to Polygon print("Downloading/opening pairs dataset") pairs_df = client.fetch_pair_universe().to_pandas() our_chain_pairs = filter_pairs_default( pairs_df, chain_id=chain_id, exchange_ids=exchange_ids, ) our_chain_pair_ids = our_chain_pairs["pair_id"] print(f"We have data for {len(our_chain_pair_ids)} trading pairs on {fname} set") print("Building pair metadata map") pairs_df = pairs_df.set_index("pair_id") pair_metadata = {pair_id: row for pair_id, row in pairs_df.iterrows()} uni_v3_pair_metadata = {pair_id: row for pair_id, row in pairs_df.iterrows() if row["exchange_slug"] == "uniswap-v3"} print(f"From this, Uniswap v3 data has {len(uni_v3_pair_metadata)} pairs") # Download all liquidity data, extract # trading pairs that exceed our prefiltering threshold print(f"Downloading/opening TVL/liquidity dataset {liquidity_time_bucket}") liquidity_df = client.fetch_all_liquidity_samples(liquidity_time_bucket).to_pandas() print(f"Setting up per-pair liquidity filtering, raw liquidity data os {len(liquidity_df)} entries") liquidity_df = liquidity_df.loc[liquidity_df.pair_id.isin(our_chain_pair_ids)] liquidity_df = liquidity_df.set_index("timestamp").groupby("pair_id") print(f"Forward-filling liquidity, before forward-fill the size is {len(liquidity_df)} samples, target frequency is {liquidity_time_bucket.to_frequency()}") liquidity_df = forward_fill(liquidity_df, liquidity_time_bucket.to_frequency(), columns=("close",)) # Only daily close liq needed for analysis, don't bother resample other cols # Get top liquidity for all of our pairs print(f"Filtering out historical liquidity of pairs") pair_liquidity_max_historical, pair_liquidity_today = build_liquidity_summary(liquidity_df, our_chain_pair_ids) print(f"Chain {chain_id.name} has liquidity data for {len(pair_liquidity_max_historical)} pairs at {liquidity_comparison_date}") # Check how many pairs did not have good values for liquidity broken_pairs = {pair_id for pair_id, liquidity in pair_liquidity_max_historical.items() if liquidity < 0} print(f"Liquidity data is broken for {len(broken_pairs)} trading pairs") uniswap_v3_liquidity_pairs = {pair_id for pair_id in pair_liquidity_max_historical.keys() if pair_id in uni_v3_pair_metadata} print(f"From this, Uniswap v3 is {len(uniswap_v3_liquidity_pairs)} pairs") assert len(uniswap_v3_liquidity_pairs) > 0, "No Uniswap v3 liquidity detected" # Remove duplicate pairs print("Prefiltering and removing duplicate pairs") top_liquid_pairs_filtered = Counter() for pair_id, liquidity in pair_liquidity_max_historical.most_common(): ticker = make_simple_ticker(pair_metadata[pair_id]) if liquidity < min_liquidity_threshold: # Prefilter pairs continue if ticker in top_liquid_pairs_filtered: # This pair is already in the dataset under a different pool # with more liquidity continue top_liquid_pairs_filtered[pair_id] = liquidity print(f"After prefilter, we have {len(top_liquid_pairs_filtered):,} pairs left")
- Parameters:
liquidity_df (pandas.core.frame.DataFrame | pandas.core.groupby.generic.DataFrameGroupBy) –
Liquidity data. MUST BE forward filled for no gaps and timestamp indexed.
Must be daily/weekly timeframe to include TVL data and match our lookup functions.
pair_ids (Union[Collection[int], Series]) – Pairs we are interested in
delay –
The time lag to check the “current” today’s liquidity.
Ensure the data is indexed by the time we run this code.
- Returns:
Two counters of historical max liquidity, liquidity today.
All in USD.
Pair liquidity value is set to -1 if the lookup failed (data not available, data contains inrealistic values, etc.)
- Return type:
tuple[collections.Counter[int, float], collections.Counter[int, float]]