"""Wrangle incoming data.
- Wrangle is a process where we massage incoming price/liquidity data for the isseus we may have encountered during the data collection
- Common DEX data issues are absurd price high/low spikes due to MEV trades
- We also have some open/close values that are "broken" in a sense that they do not reflect the market price you would be able to trade,
again likely due to MEV
- See :py:func:`fix_dex_price_data` for fixing
"""
import logging
import datetime
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy
import numpy as np
from .time import naive_utcnow
from .forward_fill import forward_fill as _forward_fill
logger = logging.getLogger(__name__)
[docs]def fix_bad_wicks(
df: pd.DataFrame,
threshold=(0.1, 1.9),
too_slow_threshold=15,
verbose=False,
bad_open_close_threshold: float | None=3.0,
) -> pd.DataFrame:
"""Correct out bad high/low values in OHLC data.
Applicable for both :term:`OHLCV` price feeds and liquidity feeds.
On :term:`Uniswap` v2 and compatibles, Bad wicks are caused by e.g. very large flash loan, oracle price manipulation attacks,
and misbheaving bots.
This function removes bad high/low values and sets them to open/close if they seem to be wildly out of sample.
:param threshold:
How many pct % wicks are allowed through.
Tuple (low threshold, high threshold) relative to close.
Default to 50%. A high wick cannot be more than 50% of close.
:param too_slow_threshold:
Complain if this takes too long
:param bad_open_close_threshold:
How many X open must be above the high to be considered a broken data point.
The open price will be replaced with high price.
Do not set for liquidity processing.
:param verbose:
Make some debug logging when using the function for manual data diagnostics.
"""
start = naive_utcnow()
if len(df) == 0:
return df
# Optimised with np.where()
# https://stackoverflow.com/a/65729035/315168
if threshold is not None:
df["high"] = np.where(df["high"] > df["close"] * threshold[1], df["close"], df["high"])
df["low"] = np.where(df["low"] < df["close"] * threshold[0], df["close"], df["low"])
# For manual diagnostics tracking down bad trading pair data
if verbose and bad_open_close_threshold:
bad_opens = df[df["open"] > df["high"] * bad_open_close_threshold]
for idx, row in islice(bad_opens.iterrows(), 10):
logger.warning(
"Pair id %d, timestamp: %s, open: %s, high: %s, buy volume: %s sell volume: %s, volume: %s",
row.pair_id,
row.timestamp,
row.open,
row.high,
row.get("buy_volume"),
row.get("sell_volume"),
row.get("volume"),
)
logger.warn("Total %d bad open price entries detected", len(bad_opens))
# Issues in open price values with data point - open cannot be higher than high.
# Not strickly "wicks" but we fix all data while we are at it.
if bad_open_close_threshold:
df["open"] = np.where(df["open"] > df["high"] * bad_open_close_threshold, df["high"], df["open"])
df["close"] = np.where(df["close"] > df["high"] * bad_open_close_threshold, df["high"], df["close"])
duration = naive_utcnow() - start
if duration > datetime.timedelta(seconds=too_slow_threshold):
logger.warning("Very slow fix_bad_wicks(): %s", duration)
# The following code chokes
# mask = (df["high"] > df["close"] * (1+threshold)) | (df["low"] < df["close"] * threshold)
#df.loc[mask, "high"] = df["close"]
#df.loc[mask, "low"] = df["close"]
#df.loc[mask, "wick_filtered"] = True
return df
[docs]def filter_bad_wicks(df: pd.DataFrame, threshold=(0.1, 1.9)) -> pd.DataFrame:
"""Mark the bad wicks.
On :term:`Uniswap` v2 and compatibles, Bad wicks are caused by e.g. very large flash loan, oracle price manipulation attacks,
and misbheaving bots.
This function removes bad high/low values and sets them to open/close if they seem to be wildly out of sample.
:param threshold:
How many pct % wicks are allowed through as (low, high) tuple.
This is a tuple (low threshold, high threshold).
If low < close * threshold[0] ignore the value.
If high > close * threshold[0] ignore the value.
"""
df_matches = df.loc[
(df["high"] > df["close"] * threshold[1]) | (df["low"] < df["close"] * threshold[0])
]
return df_matches
[docs]def remove_zero_candles(
df: pd.DataFrame,
) -> pd.DataFrame:
"""Remove any candle that has a zero value for OHLC
:param df: Dataframe that may contain zero candles
:return: pd.Dataframe
"""
if len(df) > 0:
filtered_df = df[(df['open'] != 0) & (df['high'] != 0) & (df['low'] != 0) & (df['close'] != 0)]
return filtered_df
return df
[docs]def fix_dex_price_data(
df: pd.DataFrame | DataFrameGroupBy,
freq: pd.DateOffset | str | None = None,
forward_fill: bool = True,
bad_open_close_threshold: float | None = 3.0,
fix_wick_threshold: tuple | None = (0.1, 1.9),
remove_candles_with_zero: bool = True,
) -> pd.DataFrame:
"""Wrangle DEX price data for all known issues.
- Fix broken open/high/low/close value so that they are less likely to cause problems for algorithms
- Wrangle is a process where we massage incoming price/liquidity data for the isseus we may have encountered during the data collection
- Common DEX data issues are absurd price high/low spikes due to MEV trades
- We also have some open/close values that are "broken" in a sense that they do not reflect the market price you would be able to trade,
again likely due to MEV
Example:
.. code-block:: python
# After we know pair ids that fill the liquidity criteria,
# we can build OHLCV dataset for these pairs
print(f"Downloading/opening OHLCV dataset {time_bucket}")
price_df = client.fetch_all_candles(time_bucket).to_pandas()
print(f"Filtering out {len(top_liquid_pair_ids)} pairs")
price_df = price_df.loc[price_df.pair_id.isin(top_liquid_pair_ids)]
print("Wrangling DEX price data")
price_df = price_df.set_index("timestamp", drop=False).groupby("pair_id")
price_df = fix_dex_price_data(
price_df,
freq=time_bucket.to_frequency(),
forward_fill=True,
)
print(f"Retrofitting OHLCV columns for human readability")
price_df = price_df.obj
price_df["pair_id"] = price_df.index.get_level_values(0)
price_df["ticker"] = price_df.apply(lambda row: make_full_ticker(pair_metadata[row.pair_id]), axis=1)
price_df["link"] = price_df.apply(lambda row: make_link(pair_metadata[row.pair_id]), axis=1)
# Export data, make sure we got columns in an order we want
print(f"Writing OHLCV CSV")
del price_df["timestamp"]
del price_df["pair_id"]
price_df = price_df.reset_index()
column_order = ('ticker', 'timestamp', 'open', 'high', 'low', 'close', 'volume', 'link', 'pair_id',)
price_df = price_df.reindex(columns=column_order) # Sort columns in a specific order
price_df.to_csv(
price_output_fname,
)
print(f"Wrote {price_output_fname}, {price_output_fname.stat().st_size:,} bytes")
:param df:
Price dataframe with OHLCV data.
May contain columns named open, close, high, low, volume and timestamp.
For multipair data this must be `DataFrameGroupBy`.
:param freq:
The incoming Pandas frequency of the data, e.g. "d" for daily.
If the incoming data frequency and `freq` parameter do not match, the data is resampled o the given frequency.
:param fix_wick_threshold:
Apply abnormal high/low wick fix filter.
Percent value of maximum allowed high/low wick relative to close.
By default fix values where low is 90% lower than close and high is 90% higher than close.
See :py:func:`~tradingstrategy.utils.groupeduniverse.fix_bad_wicks` for more information.
:param bad_open_close_threshold:
See :py:func:`fix_bad_wicks`.
:param primary_key_column:
The pair/reserve id column name in the dataframe.
:param remove_zero_candles:
Remove candles with zero values for OHLC.
To deal with abnormal data.
:param forward_fill:
Forward-will gaps in the data.
Forward-filling data will delete any unknown columns,
see :py:func:`tradingstrategy.utils.forward_fill.forward_fill` details.
:return:
Fixed data frame.
If forward fill is used, all other columns outside OHLCV are dropped.
"""
assert isinstance(df, (pd.DataFrame, DataFrameGroupBy)), f"Got: {df.__class__}"
if isinstance(df, DataFrameGroupBy):
raw_df = df.obj
else:
raw_df = df
if fix_wick_threshold or bad_open_close_threshold:
raw_df = fix_bad_wicks(
raw_df,
fix_wick_threshold,
bad_open_close_threshold=bad_open_close_threshold,
)
if remove_candles_with_zero:
raw_df = remove_zero_candles(raw_df)
if forward_fill:
assert freq, "freq argument must be given if forward_fill=True"
df = _forward_fill(df, freq)
return df
else:
return raw_df