Source code for tradeexecutor.state.store

"""State serialisation to disk and JavaScript clients."""
import abc
import datetime
import enum
import json
import os
import shutil
import tempfile
from pathlib import Path
import logging
from pprint import pprint
from typing import Union, Optional, Callable

from dataclasses_json.core import _ExtendedEncoder

from tradeexecutor.state.state import State
from tradeexecutor.state.validator import validate_nested_state_dict

logger = logging.getLogger(__name__)


[docs]class StateStoreModel(enum.Enum): """How the algorithm execution state is stored.""" file = "file" on_chain = "on_chain"
[docs]class StateStore(abc.ABC): """Backend to manage the trade exeuction persistent state."""
[docs] @abc.abstractmethod def is_pristine(self) -> bool: """State has not been written yet."""
[docs] @abc.abstractmethod def load(self) -> State: """Load the state from the storage."""
[docs] @abc.abstractmethod def sync(self, state: State): """Save the state to the storage."""
[docs] @abc.abstractmethod def create(self, name: str) -> State: """Create a new state storage. :param name: Name of the strategy this State belongs to """ state = State() state.name = name return state
[docs]class JSONFileStore(StateStore): """Store the state of the executor as a JSON file. - Read by strategy on a startup - Read by webhook when asked over the API """
[docs] def __init__(self, path: Union[Path, str], on_save: Callable=None): """ :param path: Path to the JSON file :param on_save: Save hook. Used by `RunState.read_only_state_copy` """ assert path if not isinstance(path, Path): path = Path(path) self.path = path self.on_save = on_save
def __repr__(self): path = os.path.abspath(self.path) return f"<JSON file at {path}>"
[docs] def is_pristine(self) -> bool: return not self.path.exists()
[docs] def load(self) -> State: logger.info("Loaded state from %s", self.path) return State.read_json_file(self.path)
[docs] def sync(self, state: State): """Write new JSON state dump using Linux atomic filereplacement.""" dirname, basename = os.path.split(self.path) # Prepare for an atomic replacement temp = tempfile.NamedTemporaryFile(mode='wt', delete=False, dir=dirname) with open(temp.name, "wt") as out: state.last_updated_at = datetime.datetime.utcnow() # Insert special validation logic here to have # friendly error messages for the JSON serialisation errors data = state.to_dict(encode_json=False) validate_nested_state_dict(data) try: txt = json.dumps(data, cls=_ExtendedEncoder) except TypeError as e: # add some helpful debug info. # The usual cause of state serialisation failure is having # non-JSON objects in the state logger.error("State serialisation failed: %s", e) pprint(state.to_dict()) raise out.write(txt) written = len(txt) logger.info(f"Saved state to %s, total {written:,} chars", self.path) temp.close() shutil.move(temp.name, self.path) if self.on_save: self.on_save(state)
[docs] def create(self, name: str) -> State: logger.info("Created new state for the strategy %s at %s", name, os.path.realpath(self.path)) return super().create(name)
[docs]class NoneStore(StateStore): """Store that is not persistent. Used in unit tests. Seed with initial state. """
[docs] def __init__(self, state: Optional[State]=None): self.created = False if not state: state = State() self.state = state
[docs] def is_pristine(self) -> bool: return False
[docs] def load(self) -> State: return self.state
[docs] def sync(self, state: State): """Do not persist anything.""" state.last_updated_at = datetime.datetime.utcnow()
[docs] def create(self) -> State: raise NotImplementedError("This should not be called for NoneStore.\n" "Backtest have explicit state set for them at the start that should not be cleared.")