Source code for tradingstrategy.transport.jsonl

"""Load candle and liquidity data over Trading Strategy real-time API.

[Use JSONL transport](https://tradingstrategy.ai/api/explorer/).
This method does not require API key at the moment.
"""
import logging
from collections import defaultdict

import pandas as pd

import datetime
from typing import Optional, Dict, Set

import requests
import jsonlines
from numpy import NaN
from tqdm_loggable.auto import tqdm

from tradingstrategy.candle import Candle
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.utils.time import to_int_unix_timestamp, naive_utcnow, naive_utcfromtimestamp

logger = logging.getLogger(__name__)


#: Column name mappings from JSONL data to our :py:class:`~tradingstrategy.candle.Candle`.
CANDLE_MAPPINGS = {
    "ci": None,  # chain_id discarded
    "ei": None,  # exchange_id discarded
    "p": "pair_id",
    "ts": "timestamp",
    "o": "open",
    "h": "high",
    "l": "low",
    "c": "close",
    "xr": "exchange_rate",
    "b": "buys",
    "s": "sells",
    "bv": "buy_volume",
    "sv": "sell_volume",
    "sb": "start_block",
    "eb": "end_block",
    "tc": None,  # Currently not available for Uni v3 exchanges
    "v": "volume",  # TODO deprecate
}


class JSONLMaxResponseSizeExceeded(Exception):
    """Raised if we ask too much JSONL data from the server."""


class NoJSONLData(Exception):
    """Server did not return any data for some reason."""


[docs]def load_trading_strategy_like_jsonl_data( session: requests.Session, api_url: str, pair_ids: Set[int], time_bucket: TimeBucket, mappings: Dict[str, str], start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, max_bytes: Optional[int] = None, progress_bar_description: Optional[str] = None, ) -> pd.DataFrame: """Read data from JSONL endpoint. `See OpenAPI spec for details on the format <https://tradingstrategy.ai/api/explorer/>`_. Can be used to load - OHLCV candles - Liquidity data Calling this function may consume up to few hundred megabytes of memory depending on the response size. Display a progress bar using :py:mod:`tqdm`. :param df: The master DataFrame we are going to fill up. :param api_url: Which Trading Strategy API we call :param pair_ids: Trading pairs we query data for :param time_bucket: Candle time frame :param mappings: Mapping between JSONL object keys and DataFrame columns :return: In-place modified DataFrame passed to this function """ params = { "pair_ids": ",".join(str(i) for i in pair_ids), "time_bucket": time_bucket.value, } if start_time: params["start"] = start_time.isoformat() if end_time: params["end"] = end_time.isoformat() if max_bytes: params["max_bytes"] = max_bytes logger.info("Loading JSON data, endpoint:%s, params:%s", api_url, params) resp = session.get(api_url, params=params, stream=True) reader = jsonlines.Reader(resp.raw) candle_data = defaultdict(list) # Figure out how to plot candle download progress using TQDM # Draw progress bar using timestamps first candle - last candle progress_bar_start = None progress_bar_end = end_time or naive_utcnow() progress_bar_end = to_int_unix_timestamp(progress_bar_end) current_ts = last_ts = None progress_bar = None refresh_rate = 200 # Update the progress bar for every N candles # Massage the format good for pandas for idx, item in enumerate(reader): # Stream terminated if "error" in item: raise JSONLMaxResponseSizeExceeded(str(item)) current_ts = item["ts"] # Set progress bar start to the first timestamp if not progress_bar_start and progress_bar_description: progress_bar_start = current_ts logger.debug("First candle timestamp at %s", current_ts) total = progress_bar_end - progress_bar_start assert progress_bar_start <= progress_bar_end, f"Mad progress bar {progress_bar_start} - {progress_bar_end}" progress_bar = tqdm(desc=progress_bar_description, total=total) # Translate the raw compressed keys to our internal # Pandas keys for key, value in item.items(): translated_key = mappings[key] if translated_key is None: # Deprecated/discarded keys continue candle_data[translated_key].append(value) if idx % refresh_rate == 0: if last_ts and progress_bar: progress_bar.update(current_ts - last_ts) progress_bar.set_postfix({"Currently at": naive_utcfromtimestamp(current_ts)}) last_ts = current_ts # Some data validation facilities assumed_lenght = None previous_key = None for key, arr in candle_data.items(): current_array_len = len(arr) if assumed_lenght is None: assumed_lenght = current_array_len elif assumed_lenght != current_array_len: raise RuntimeError(f"Bad JSONL data. The length for {previous_key} was {assumed_lenght}, but the length for {key} is {current_array_len}") previous_key = key if progress_bar: # https://stackoverflow.com/a/45808255/315168 progress_bar.update(progress_bar.total - progress_bar.n) progress_bar.close() if len(candle_data) == 0: raise NoJSONLData(f"Did not get any data, url:{api_url}, params:{params}") df = pd.DataFrame.from_dict(candle_data) logger.debug("Loaded %d rows", len(df)) return df
[docs]def load_candles_jsonl( session: requests.Session, server_url: str, pair_ids: Set[id], time_bucket: TimeBucket, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, max_bytes: Optional[int] = None, progress_bar_description: Optional[str] = None, sanity_check_count=75, ) -> pd.DataFrame: """Load candles using JSON API and produce a DataFrame. Serially load each pair data. - Load data from per-pair JSON endpoint - Each pair becomes pandas :py:class:`pd.Series` - The final DataFrame is the merge of these series See :py:mod:`tradingstrategy.candle` for candle format description. :param progress_bar_description: Progress bar label :param sanity_check_count: Don't accidentally try to load too many pairs. Never exceed this value. :raise JSONLMaxResponseSizeExceeded: If the max_bytes limit is breached :return: Dataframe with candle data for giving pairs. """ assert len(pair_ids) < sanity_check_count, f"We attempt to load data for trading pairs {len(pair_ids)}, but you probably don't want to use this data fetch method for more than {sanity_check_count} pairs" api_url = f"{server_url}/candles-jsonl" df = load_trading_strategy_like_jsonl_data( session, api_url, pair_ids, time_bucket, CANDLE_MAPPINGS, start_time, end_time, max_bytes, progress_bar_description, ) # Not supported at the momemnt df.loc[:, "avg"] = NaN if "volume" not in df.columns: # Reconstruct normal volume column as expected for OHLCV data df["volume"] = df["buy_volume"] + df["sell_volume"] df = df.astype(Candle.DATAFRAME_FIELDS) # Convert JSONL unix timestamps to Pandas df["timestamp"] = pd.to_datetime(df['timestamp'], unit='s') # Assume candles are always indexed by their timestamp df.set_index("timestamp", inplace=True, drop=False) return df