diff --git a/.env.example b/.env.example index 200898b..3088d15 100644 --- a/.env.example +++ b/.env.example @@ -44,6 +44,7 @@ FIRSTRADE_PERSIST_STRATEGY_RUNS=false FIRSTRADE_ENABLE_LIVE_TRADING=false FIRSTRADE_RUN_SMOKE_ON_HTTP=false FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=false +FIRSTRADE_SESSION_CHECK_POLICY=auto FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS=false FIRSTRADE_RUN_STRATEGY_ON_HTTP=false FIRSTRADE_LIVE_ORDER_ACK=false diff --git a/README.md b/README.md index 6c14d10..3eeb492 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ commit credentials. | `FIRSTRADE_ENABLE_LIVE_TRADING` | Optional | Must be `true` before any live order can be submitted | | `FIRSTRADE_RUN_SMOKE_ON_HTTP` | Optional | Must be `true` before `/smoke` performs a real login/quote | | `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP` | Optional | Must be `true` before `/session-check` performs a read-only login/session/account-state check | +| `FIRSTRADE_SESSION_CHECK_POLICY` | Optional | `/session-check` maintenance cadence: `auto`, `always`, or `skip`. Defaults to `auto`; monthly snapshot strategies run at most once per month when GCS state is available, while daily strategies run every check | | `FIRSTRADE_SESSION_CHECK_INCLUDE_POSITIONS` | Optional | Include compact symbol/quantity/market-value positions in `/session-check` funds snapshots. Defaults to `false` | | `FIRSTRADE_RUN_STRATEGY_ON_HTTP` | Optional | Must be `true` before `/run` performs strategy evaluation and order routing | | `FIRSTRADE_LIVE_ORDER_ACK` | Optional | Must be `true` before `/run` can submit live orders | @@ -211,10 +212,16 @@ sessions from another device, or broker-side invalidation still fall back to a fresh login. `/session-check` is a read-only route for session keepalive experiments and -account-state persistence. It connects to Firstrade, selects the account, reads -balances, optionally reads positions, and returns a compact masked funds -snapshot. With `FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true`, it writes the snapshot -to `accounts//funds/latest.json` plus a timestamped history path +account-state persistence. With `FIRSTRADE_SESSION_CHECK_POLICY=auto`, it reads +the configured strategy cadence before connecting. Daily strategies and profiles +with daily canary checks run every time. Monthly snapshot strategies run once per +calendar month when `FIRSTRADE_GCS_STATE_BUCKET` is available; otherwise the +route runs conservatively instead of skipping. A skipped check returns +`session_check_skipped=true` and does not create a Firstrade client. When the +check runs, it connects to Firstrade, selects the account, reads balances, +optionally reads positions, and returns a compact masked funds snapshot. With +`FIRSTRADE_PERSIST_ACCOUNT_SNAPSHOT=true`, it writes the snapshot to +`accounts//funds/latest.json` plus a timestamped history path under the configured GCS prefix. Raw account IDs and login secrets are not included in the snapshot. @@ -337,11 +344,14 @@ runtime service account object read/write access, and set: - `FIRSTRADE_GCS_STATE_BUCKET=` - `FIRSTRADE_STATE_PREFIX=firstrade-platform` - `FIRSTRADE_RUN_SESSION_CHECK_ON_HTTP=true` +- `FIRSTRADE_SESSION_CHECK_POLICY=auto` The `/session-check` scheduler can safely run more often than the strategy -scheduler because it is read-only. A typical test schedule is every 30 minutes -during US regular market hours. The route logs `session_reused=true|false` and -writes the latest masked funds snapshot plus timestamped history to GCS. +scheduler because it is read-only. With the default `auto` policy, monthly +snapshot strategies only perform real session maintenance once per month after +the first successful check writes its maintenance marker to GCS. Daily strategies +still run every scheduler hit. The route logs `session_reused=true|false` for +real checks and `Firstrade session-check skipped` for cadence-based skips. ## License And Upstream Compliance diff --git a/application/session_check_service.py b/application/session_check_service.py index 21bb167..5572d6b 100644 --- a/application/session_check_service.py +++ b/application/session_check_service.py @@ -2,8 +2,10 @@ from __future__ import annotations +import json import os from collections.abc import Callable, Mapping +from dataclasses import dataclass from datetime import datetime, timezone from typing import Any @@ -20,6 +22,11 @@ mask_account_id, ) from application.state_persistence import GcsStateStore, build_gcs_state_store_from_env +from strategy_registry import ( + FIRSTRADE_PLATFORM, + resolve_strategy_definition, + resolve_strategy_metadata, +) BALANCE_KEYWORDS = ( "cash", @@ -32,6 +39,43 @@ "value", "margin", ) +_FEATURE_SNAPSHOT_INPUT = "feature_snapshot" +_SESSION_CHECK_POLICY_DEFAULT = "auto" +_SESSION_CHECK_POLICY_SKIP_VALUES = frozenset({"skip", "never", "disabled", "off", "false", "0"}) +_SESSION_CHECK_POLICY_ALWAYS_VALUES = frozenset({"always", "run", "true", "1"}) + + +@dataclass(frozen=True) +class SessionCheckMaintenanceDecision: + should_run: bool + policy: str + strategy_profile: str | None = None + strategy_cadence: str | None = None + strategy_required_inputs: tuple[str, ...] = () + period: str | None = None + state_key: str | None = None + reason: str = "" + last_checked_at: str | None = None + diagnostic_error: str | None = None + + def to_response_fields(self) -> dict[str, Any]: + fields: dict[str, Any] = { + "session_check_policy": self.policy, + "session_check_policy_reason": self.reason, + } + if self.strategy_profile: + fields["strategy_profile"] = self.strategy_profile + if self.strategy_cadence: + fields["strategy_cadence"] = self.strategy_cadence + if self.strategy_required_inputs: + fields["strategy_required_inputs"] = list(self.strategy_required_inputs) + if self.period: + fields["session_check_period"] = self.period + if self.last_checked_at: + fields["session_check_last_checked_at"] = self.last_checked_at + if self.diagnostic_error: + fields["session_check_policy_error"] = self.diagnostic_error + return fields def _flag(name: str, default: str = "false", env: Callable[[str, str | None], str | None] = os.getenv) -> bool: @@ -46,6 +90,218 @@ def _safe_key(value: str) -> str: return "".join(ch if ch.isalnum() else "_" for ch in str(value or "")) or "unknown" +def _normalize_session_check_policy(raw_value: str | None) -> str: + value = str(raw_value or _SESSION_CHECK_POLICY_DEFAULT).strip().lower() + if not value or value == "auto": + return "auto" + if value in _SESSION_CHECK_POLICY_ALWAYS_VALUES: + return "always" + if value in _SESSION_CHECK_POLICY_SKIP_VALUES: + return "skip" + return "auto" + + +def _configured_strategy_profile( + env: Callable[[str, str | None], str | None] = os.getenv, +) -> str | None: + raw_runtime_target = env("RUNTIME_TARGET_JSON", None) + if raw_runtime_target: + payload = json.loads(raw_runtime_target) + if not isinstance(payload, dict): + raise ValueError("RUNTIME_TARGET_JSON must decode to an object") + raw_profile = payload.get("strategy_profile") + profile = str(raw_profile or "").strip() + if profile: + return profile + profile = str(env("STRATEGY_PROFILE", "") or "").strip() + return profile or None + + +def _resolve_strategy_session_check_context( + env: Callable[[str, str | None], str | None] = os.getenv, +) -> tuple[str | None, str | None, tuple[str, ...], str | None]: + try: + raw_profile = _configured_strategy_profile(env) + if not raw_profile: + return None, None, (), "STRATEGY_PROFILE or RUNTIME_TARGET_JSON is not configured" + definition = resolve_strategy_definition(raw_profile, platform_id=FIRSTRADE_PLATFORM) + metadata = resolve_strategy_metadata(definition.profile, platform_id=FIRSTRADE_PLATFORM) + required_inputs = tuple(sorted(str(value) for value in definition.required_inputs)) + return definition.profile, metadata.cadence, required_inputs, None + except Exception as exc: + return None, None, (), f"{type(exc).__name__}: {exc}" + + +def _session_check_period( + *, + cadence: str | None, + required_inputs: tuple[str, ...], + now: datetime, +) -> tuple[str | None, str]: + cadence_text = str(cadence or "").strip().lower() + required_input_set = frozenset(required_inputs) + if "daily" in cadence_text or "intraday" in cadence_text: + return None, "daily_strategy" + if "weekly" in cadence_text: + return f"{now:%G}-W{now:%V}", "weekly_strategy" + if "monthly" in cadence_text: + return now.strftime("%Y-%m"), "monthly_strategy" + if "quarterly" in cadence_text: + quarter = ((now.month - 1) // 3) + 1 + return f"{now.year}-Q{quarter}", "quarterly_strategy" + if _FEATURE_SNAPSHOT_INPUT in required_input_set: + return now.strftime("%Y-%m"), "feature_snapshot_strategy" + return None, "unthrottled_strategy" + + +def _session_check_state_key( + *, + strategy_profile: str, + period: str, + env: Callable[[str, str | None], str | None] = os.getenv, +) -> str: + account_selector = str(env("FIRSTRADE_ACCOUNT", "") or "").strip() + account_key = _safe_key(mask_account_id(account_selector)) if account_selector else "auto" + return ( + f"session-checks/{account_key}/{_safe_key(strategy_profile)}/" + f"{_safe_key(period)}/latest.json" + ) + + +def resolve_session_check_maintenance_decision( + *, + state_store: GcsStateStore | None = None, + env_reader: Callable[[str, str | None], str | None] = os.getenv, + now: datetime | None = None, +) -> SessionCheckMaintenanceDecision: + as_of = now or _utcnow() + policy = _normalize_session_check_policy(env_reader("FIRSTRADE_SESSION_CHECK_POLICY", "auto")) + strategy_profile, cadence, required_inputs, strategy_error = _resolve_strategy_session_check_context( + env_reader + ) + if policy == "skip": + return SessionCheckMaintenanceDecision( + should_run=False, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + reason="policy_skip", + diagnostic_error=strategy_error, + ) + if policy == "always": + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + reason="policy_always", + diagnostic_error=strategy_error, + ) + if strategy_error or not strategy_profile: + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + reason="strategy_context_unavailable", + diagnostic_error=strategy_error, + ) + + period, period_reason = _session_check_period( + cadence=cadence, + required_inputs=required_inputs, + now=as_of, + ) + if period is None: + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + reason=period_reason, + ) + state_key = _session_check_state_key( + strategy_profile=strategy_profile, + period=period, + env=env_reader, + ) + if state_store is None: + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + period=period, + state_key=state_key, + reason="state_store_unavailable", + ) + try: + existing = state_store.read_json(state_key) + except Exception as exc: + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + period=period, + state_key=state_key, + reason="state_read_failed", + diagnostic_error=f"{type(exc).__name__}: {exc}", + ) + if existing: + return SessionCheckMaintenanceDecision( + should_run=False, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + period=period, + state_key=state_key, + reason=period_reason, + last_checked_at=str(existing.get("checked_at") or "") or None, + ) + return SessionCheckMaintenanceDecision( + should_run=True, + policy=policy, + strategy_profile=strategy_profile, + strategy_cadence=cadence, + strategy_required_inputs=required_inputs, + period=period, + state_key=state_key, + reason=f"{period_reason}_due", + ) + + +def persist_session_check_maintenance( + *, + store: GcsStateStore, + decision: SessionCheckMaintenanceDecision, + account: str, + session_reused: bool, + now: datetime | None = None, +) -> bool: + if not decision.state_key: + return False + as_of = now or _utcnow() + payload = { + "checked_at": as_of.isoformat(), + "account": mask_account_id(account), + "session_reused": session_reused, + "strategy_profile": decision.strategy_profile, + "strategy_cadence": decision.strategy_cadence, + "strategy_required_inputs": list(decision.strategy_required_inputs), + "period": decision.period, + "policy": decision.policy, + } + return store.write_json(decision.state_key, payload) + + def _compact_positions(payload: Any) -> list[dict[str, Any]]: positions: list[dict[str, Any]] = [] for row in iter_position_rows(payload): @@ -108,8 +364,30 @@ def run_session_check( env_reader: Callable[[str, str | None], str | None] = os.getenv, now: datetime | None = None, ) -> dict[str, Any]: - resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader) + as_of = now or _utcnow() store = state_store or build_gcs_state_store_from_env(env_reader) + maintenance_decision = resolve_session_check_maintenance_decision( + state_store=store, + env_reader=env_reader, + now=as_of, + ) + if not maintenance_decision.should_run: + print( + "Firstrade session-check skipped " + f"policy={maintenance_decision.policy} " + f"strategy={maintenance_decision.strategy_profile or ''} " + f"period={maintenance_decision.period or ''} " + f"reason={maintenance_decision.reason}", + flush=True, + ) + return { + "ok": True, + "api_kind": "unofficial-reverse-engineered", + "session_check_skipped": True, + **maintenance_decision.to_response_fields(), + } + + resolved_credentials = credentials or FirstradeCredentials.from_env(env_reader) client = client_factory( resolved_credentials, live_trading_enabled=is_live_trading_enabled(env_reader), @@ -129,7 +407,7 @@ def run_session_check( balances=balances, positions_payload=positions_payload, session_reused=session_reused, - now=now, + now=as_of, ) snapshot_persisted = False snapshot_error = None @@ -138,16 +416,30 @@ def run_session_check( snapshot_persisted = persist_funds_snapshot( store=store, snapshot=snapshot, - now=now, + now=as_of, ) except Exception as exc: snapshot_error = f"{type(exc).__name__}: {exc}" + maintenance_state_persisted = False + maintenance_state_error = None + if store is not None and maintenance_decision.state_key: + try: + maintenance_state_persisted = persist_session_check_maintenance( + store=store, + decision=maintenance_decision, + account=account, + session_reused=session_reused, + now=as_of, + ) + except Exception as exc: + maintenance_state_error = f"{type(exc).__name__}: {exc}" print( "Firstrade session-check " f"session_reused={session_reused} " f"account={mask_account_id(account)} " - f"snapshot_persisted={snapshot_persisted}", + f"snapshot_persisted={snapshot_persisted} " + f"maintenance_state_persisted={maintenance_state_persisted}", flush=True, ) result: dict[str, Any] = { @@ -157,7 +449,11 @@ def run_session_check( "session_reused": session_reused, "funds_snapshot": snapshot, "snapshot_persisted": snapshot_persisted, + "session_check_maintenance_state_persisted": maintenance_state_persisted, + **maintenance_decision.to_response_fields(), } if snapshot_error: result["snapshot_error"] = snapshot_error + if maintenance_state_error: + result["session_check_maintenance_state_error"] = maintenance_state_error return result diff --git a/tests/test_session_check_service.py b/tests/test_session_check_service.py index 7fcc5fe..2aa74e5 100644 --- a/tests/test_session_check_service.py +++ b/tests/test_session_check_service.py @@ -40,14 +40,30 @@ def get_positions(self, _account): class FakeStateStore: - def __init__(self): + def __init__(self, reads=None): + self.payloads = dict(reads or {}) + self.reads = [] self.writes = [] + def read_json(self, key): + self.reads.append(key) + return self.payloads.get(key) + def write_json(self, key, payload): self.writes.append((key, payload)) + self.payloads[key] = payload return True +class ExplodingClient: + def __init__(self, *_args, **_kwargs): + raise AssertionError("client should not be created when session-check is skipped") + + +def _env(values): + return lambda name, default=None: values.get(name, default) + + def test_build_account_funds_snapshot_masks_account_and_compacts_values(): snapshot = build_account_funds_snapshot( account="12345678", @@ -91,3 +107,128 @@ def test_run_session_check_persists_funds_snapshot_when_enabled(): assert store.writes[0][0] == "accounts/____5678/funds/latest.json" assert store.writes[1][0] == "accounts/____5678/funds/history/2026/05/23/20260523T010203Z.json" assert store.writes[0][1]["positions"][0]["symbol"] == "SPY" + + +def test_monthly_session_check_skips_when_current_period_is_already_maintained(): + now = datetime(2026, 6, 3, 1, 2, 3, tzinfo=timezone.utc) + state_key = ( + "session-checks/auto/mega_cap_leader_rotation_top50_balanced/2026_06/latest.json" + ) + store = FakeStateStore( + { + state_key: { + "checked_at": "2026-06-01T01:02:03+00:00", + "period": "2026-06", + } + } + ) + + result = run_session_check( + client_factory=ExplodingClient, + state_store=store, + env_reader=_env({"STRATEGY_PROFILE": "mega_cap_leader_rotation_top50_balanced"}), + now=now, + ) + + assert result["ok"] is True + assert result["session_check_skipped"] is True + assert result["session_check_policy"] == "auto" + assert result["session_check_period"] == "2026-06" + assert result["session_check_last_checked_at"] == "2026-06-01T01:02:03+00:00" + assert store.reads == [state_key] + assert store.writes == [] + + +def test_monthly_session_check_runs_and_persists_maintenance_state_when_due(): + now = datetime(2026, 6, 3, 1, 2, 3, tzinfo=timezone.utc) + store = FakeStateStore() + + result = run_session_check( + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=FakeClient, + state_store=store, + env_reader=_env({"STRATEGY_PROFILE": "mega_cap_leader_rotation_top50_balanced"}), + now=now, + ) + + assert result["ok"] is True + assert result["session_check_maintenance_state_persisted"] is True + state_key = ( + "session-checks/auto/mega_cap_leader_rotation_top50_balanced/2026_06/latest.json" + ) + assert store.reads == [state_key] + assert store.writes == [ + ( + state_key, + { + "checked_at": "2026-06-03T01:02:03+00:00", + "account": "****5678", + "session_reused": True, + "strategy_profile": "mega_cap_leader_rotation_top50_balanced", + "strategy_cadence": "monthly", + "strategy_required_inputs": ["feature_snapshot"], + "period": "2026-06", + "policy": "auto", + }, + ) + ] + + +def test_daily_session_check_runs_every_time_without_maintenance_state_lookup(): + now = datetime(2026, 6, 3, 1, 2, 3, tzinfo=timezone.utc) + store = FakeStateStore() + + result = run_session_check( + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=FakeClient, + state_store=store, + env_reader=_env({"STRATEGY_PROFILE": "tqqq_growth_income"}), + now=now, + ) + + assert result["ok"] is True + assert result["session_check_policy_reason"] == "daily_strategy" + assert result["session_check_maintenance_state_persisted"] is False + assert store.reads == [] + assert store.writes == [] + + +def test_session_check_policy_always_overrides_monthly_throttle(): + now = datetime(2026, 6, 3, 1, 2, 3, tzinfo=timezone.utc) + state_key = ( + "session-checks/auto/mega_cap_leader_rotation_top50_balanced/2026_06/latest.json" + ) + store = FakeStateStore({state_key: {"checked_at": "2026-06-01T01:02:03+00:00"}}) + + result = run_session_check( + credentials=FirstradeCredentials(username="user", password="pass"), + client_factory=FakeClient, + state_store=store, + env_reader=_env( + { + "STRATEGY_PROFILE": "mega_cap_leader_rotation_top50_balanced", + "FIRSTRADE_SESSION_CHECK_POLICY": "always", + } + ), + now=now, + ) + + assert result["ok"] is True + assert result["session_check_policy"] == "always" + assert result["session_check_policy_reason"] == "policy_always" + assert result["session_check_maintenance_state_persisted"] is False + assert store.reads == [] + assert store.writes == [] + + +def test_session_check_policy_skip_does_not_require_credentials_or_client(): + result = run_session_check( + client_factory=ExplodingClient, + env_reader=_env({"FIRSTRADE_SESSION_CHECK_POLICY": "skip"}), + now=datetime(2026, 6, 3, 1, 2, 3, tzinfo=timezone.utc), + ) + + assert result["ok"] is True + assert result["session_check_skipped"] is True + assert result["session_check_policy"] == "skip" + assert result["session_check_policy_reason"] == "policy_skip"