Source code for tradingstrategy.transport.data_trigger

import datetime
from typing import Set

from requests import HTTPError, Timeout, TooManyRedirects

from tradingstrategy.client import Client
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.types import PrimaryKey

import abc
import datetime
import time
from dataclasses import dataclass
import logging
from typing import Set

from tradingstrategy.client import Client
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.types import PrimaryKey


logger = logging.getLogger(__name__)


#: Errors that are likely caused by flaky internet connection
#: and we should retry
RETRYABLE_EXCEPTIONS = (ConnectionError, HTTPError, Timeout, TooManyRedirects)


[docs]def wait_for_data(client: Client, pair_ids: Set[PrimaryKey], tick: datetime.datetime, bucket: TimeBucket, maximum_wait: datetime.timedelta, sleep: datetime.timedelta = datetime.timedelta(seconds=5), retryable_exceptions=RETRYABLE_EXCEPTIONS, ): """Wait until the candle data is available. We wait until we have the data for the previous candle available. """ logger.info("Waiting for strategy data for tick: %s, candle: %s") deadline = naive_utcnow() + maximum_wait # Query only the latest candle from JSONL endpoint candle_ts = tick - bucket.to_pandas_timedelta() start_ts = candle_ts - datetime.timedelta(seconds=1) end_ts = candle_ts attempts = 1 while naive_utcnow() < deadline: logger.info("Starting to check the data age, attempt #%d", attempts) try: candles = client.fetch_candles_by_pair_ids( pair_ids, bucket, start_ts, end_ts, ) except Exception as e: if not isinstance(e, retryable_exceptions): raise RuntimeError(f"Data wait fetch loop aborted") from e time.sleep(sleep.total_seconds()) attempts += 1