From e25621c91a1c31a1131161007beeac5292e2f34f Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 2 Jun 2026 20:15:10 +0800 Subject: [PATCH] Add execution report heartbeat alerts --- .../workflows/execution-report-heartbeat.yml | 61 +++ README.md | 14 + scripts/execution_report_heartbeat.py | 422 ++++++++++++++++++ 3 files changed, 497 insertions(+) create mode 100644 .github/workflows/execution-report-heartbeat.yml create mode 100644 scripts/execution_report_heartbeat.py diff --git a/.github/workflows/execution-report-heartbeat.yml b/.github/workflows/execution-report-heartbeat.yml new file mode 100644 index 0000000..a4fff88 --- /dev/null +++ b/.github/workflows/execution-report-heartbeat.yml @@ -0,0 +1,61 @@ +name: Execution Report Heartbeat + +on: + workflow_dispatch: + inputs: + lookback_hours: + description: "Report lookback window in hours." + required: false + type: string + default: "36" + fail_workflow_on_alert: + description: "Fail this workflow when an alert is emitted." + required: false + type: choice + default: "true" + options: + - "true" + - "false" + schedule: + - cron: "10 23 * * 1-5" + +env: + GCP_PROJECT_ID: firstradequant + GCP_WORKLOAD_IDENTITY_PROVIDER: projects/1088907247379/locations/global/workloadIdentityPools/github-actions/providers/github-main + GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT: firstrade-platform-deploy@firstradequant.iam.gserviceaccount.com + +jobs: + heartbeat: + name: Check execution report heartbeat + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + env: + RUNTIME_HEARTBEAT_NAME: FirstradePlatform + RUNTIME_HEARTBEAT_LOOKBACK_HOURS: ${{ inputs.lookback_hours || vars.RUNTIME_HEARTBEAT_LOOKBACK_HOURS || '36' }} + RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }} + RUNTIME_HEARTBEAT_ACCEPT_STAGES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STAGES }} + RUNTIME_HEARTBEAT_REJECT_STAGES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STAGES }} + FIRSTRADE_GCS_STATE_BUCKET: ${{ vars.FIRSTRADE_GCS_STATE_BUCKET }} + FIRSTRADE_STATE_PREFIX: ${{ vars.FIRSTRADE_STATE_PREFIX }} + GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }} + CRISIS_ALERT_TELEGRAM_CHAT_IDS: ${{ vars.CRISIS_ALERT_TELEGRAM_CHAT_IDS }} + CRISIS_ALERT_TELEGRAM_API_BASE_URL: ${{ vars.CRISIS_ALERT_TELEGRAM_API_BASE_URL }} + TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }} + CRISIS_ALERT_TELEGRAM_BOT_TOKEN: ${{ secrets.CRISIS_ALERT_TELEGRAM_BOT_TOKEN }} + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v3 + with: + workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ env.GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT }} + + - name: Set up gcloud + uses: google-github-actions/setup-gcloud@v3 + + - name: Check recent execution report + run: python scripts/execution_report_heartbeat.py diff --git a/README.md b/README.md index f078c79..6c14d10 100644 --- a/README.md +++ b/README.md @@ -292,6 +292,14 @@ set `RUNTIME_GUARD_REQUIRE_SUCCESS=true` and choose Scheduler run. The default leaves that heartbeat check off to avoid false alerts outside trading windows. +`Execution Report Heartbeat` (`.github/workflows/execution-report-heartbeat.yml`) +is the stricter completion check. It runs on weekdays after the expected US +window and verifies that a recent strategy-run JSON exists under +`FIRSTRADE_GCS_STATE_BUCKET` / `FIRSTRADE_STATE_PREFIX`. It reads the latest +report status/stage and alerts if no recent report exists or the latest reports +are error-like. The deploy service account needs object read/list access on the +state bucket. + ## Cloud Run Shape `main.py` exposes: @@ -438,6 +446,12 @@ Run、OIDC/IAM/audience 配错、Cloud Run 返回 4xx/5xx,或容器启动/导 `RUNTIME_GUARD_REQUIRE_SUCCESS=true`,并把 `RUNTIME_GUARD_LOOKBACK_MINUTES` 设成覆盖 Firstrade 预期 Scheduler 运行时间的窗口。默认不强制心跳,避免非交易窗口误报。 +更严格的完成检查是 `Execution Report Heartbeat` +(`.github/workflows/execution-report-heartbeat.yml`)。它会在工作日美股预期窗口后检查 +`FIRSTRADE_GCS_STATE_BUCKET` / `FIRSTRADE_STATE_PREFIX` 下最近的 strategy-run JSON, +读取 `status/stage/errors`,如果没有近期 report 或 report 呈错误状态就发 Telegram。 +GitHub deploy service account 需要对 state bucket 有对象读取/列举权限。 + 请不要把 Firstrade 登录凭据、MFA secret、cookie 文件提交到 Git。`.env`、 `.runtime/` 和 `ft_cookies*.json` 已经在 `.gitignore` 中。 diff --git a/scripts/execution_report_heartbeat.py b/scripts/execution_report_heartbeat.py new file mode 100644 index 0000000..7e4bd7e --- /dev/null +++ b/scripts/execution_report_heartbeat.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +"""Verify that an expected runtime report was written recently.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import re +import subprocess +import sys +import urllib.parse +import urllib.request +from typing import Any + + +DEFAULT_ACCEPT_STATUSES = {"ok", "skipped", "success", "completed", "no_action"} +DEFAULT_REJECT_STATUSES = {"error", "failed", "failure", "cancelled", "canceled", "timed_out"} +DEFAULT_ACCEPT_STAGES = { + "DRY_RUN_COMPLETED", + "FUNDING_BLOCKED", + "NO_ACTION", + "ORDERS_PLANNED", + "PARTIAL_SUBMITTED", + "RECONCILED", + "SUBMITTED", + "COMPLETED", +} +DEFAULT_REJECT_STAGES = {"ERROR", "EXECUTION_BLOCKED", "FAILED", "FAILURE"} + + +def _split_values(raw: str | None) -> list[str]: + if not raw: + return [] + return [part.strip() for part in re.split(r"[,;\n]+", raw) if part.strip()] + + +def _env_bool(name: str, default: bool = False) -> bool: + value = (os.environ.get(name) or "").strip().lower() + if not value: + return default + return value in {"1", "true", "yes", "y", "on"} + + +def _parse_timestamp(value: Any) -> dt.datetime | None: + if not value: + return None + text = str(value).strip().replace("Z", "+00:00") + try: + parsed = dt.datetime.fromisoformat(text) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=dt.timezone.utc) + return parsed.astimezone(dt.timezone.utc) + + +def _month_segments(start: dt.datetime, end: dt.datetime) -> list[str]: + months = [] + cursor = dt.datetime(start.year, start.month, 1, tzinfo=dt.timezone.utc) + end_cursor = dt.datetime(end.year, end.month, 1, tzinfo=dt.timezone.utc) + while cursor <= end_cursor: + months.append(f"{cursor.year:04d}-{cursor.month:02d}") + if cursor.month == 12: + cursor = dt.datetime(cursor.year + 1, 1, 1, tzinfo=dt.timezone.utc) + else: + cursor = dt.datetime(cursor.year, cursor.month + 1, 1, tzinfo=dt.timezone.utc) + return months + + +def _base_report_uris() -> list[str]: + uris = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_URIS")) + uris.extend(_split_values(os.environ.get("EXECUTION_REPORT_GCS_URI"))) + + state_bucket = (os.environ.get("FIRSTRADE_GCS_STATE_BUCKET") or "").strip() + if state_bucket: + state_prefix = (os.environ.get("FIRSTRADE_STATE_PREFIX") or "firstrade-platform").strip("/") + base = f"gs://{state_bucket}" + if state_prefix: + base = f"{base}/{state_prefix}" + uris.append(f"{base}/strategy-runs") + + seen = set() + unique = [] + for uri in uris: + clean = uri.rstrip("/") + if clean and clean not in seen: + seen.add(clean) + unique.append(clean) + return unique + + +def _load_required_services() -> list[str]: + services = [] + for name in ( + "RUNTIME_HEARTBEAT_REQUIRED_SERVICES", + "CLOUD_RUN_SERVICES", + "CLOUD_RUN_SERVICE", + ): + services.extend(_split_values(os.environ.get(name))) + + raw_targets = (os.environ.get("CLOUD_RUN_SERVICE_TARGETS_JSON") or "").strip() + if raw_targets: + try: + payload = json.loads(raw_targets) + targets = payload.get("targets") if isinstance(payload, dict) else payload + if isinstance(targets, list): + for target in targets: + if not isinstance(target, dict): + continue + runtime_target = target.get("runtime_target") or target.get("runtime_target_json") + if isinstance(runtime_target, str): + try: + runtime_target = json.loads(runtime_target) + except json.JSONDecodeError: + runtime_target = {} + for key in ("service", "service_name", "cloud_run_service"): + value = target.get(key) or ( + runtime_target.get(key) if isinstance(runtime_target, dict) else None + ) + if value: + services.extend(_split_values(str(value))) + break + except json.JSONDecodeError: + pass + + seen = set() + unique = [] + for service in services: + if service not in seen: + seen.add(service) + unique.append(service) + return unique + + +def _report_globs(since: dt.datetime, now: dt.datetime) -> list[str]: + explicit = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_GLOBS")) + if explicit: + return explicit + globs = [] + platform = (os.environ.get("RUNTIME_HEARTBEAT_REPORT_PLATFORM") or "").strip("/") + for base in _base_report_uris(): + if platform and not base.rstrip("/").endswith(f"/{platform}"): + base = f"{base}/{platform}" + for month in _month_segments(since, now): + globs.append(f"{base}/**/{month}/*.json") + return globs + + +def _run_gcloud(args: list[str]) -> subprocess.CompletedProcess[str]: + return subprocess.run(args, text=True, capture_output=True, check=False) + + +def _list_gcs_objects(gcs_glob: str, *, project: str | None) -> list[dict[str, Any]]: + command = ["gcloud", "storage", "ls", "--json", gcs_glob] + if project: + command.extend(["--project", project]) + result = _run_gcloud(command) + if result.returncode != 0: + detail = (result.stderr or result.stdout or "").strip() + if "matched no objects" in detail.lower() or "no urls matched" in detail.lower(): + return [] + raise RuntimeError(detail or f"gcloud storage ls failed for {gcs_glob}") + if not result.stdout.strip(): + return [] + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gcloud storage ls returned invalid JSON for {gcs_glob}: {exc}") from exc + return payload if isinstance(payload, list) else [] + + +def _cat_gcs_json(uri: str, *, project: str | None) -> dict[str, Any] | None: + clean_uri = uri.split("#", 1)[0] + command = ["gcloud", "storage", "cat", clean_uri] + if project: + command.extend(["--project", project]) + result = _run_gcloud(command) + if result.returncode != 0: + print(f"Unable to read report {clean_uri}: {(result.stderr or result.stdout).strip()}", file=sys.stderr) + return None + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError: + print(f"Report is not valid JSON: {clean_uri}", file=sys.stderr) + return None + return payload if isinstance(payload, dict) else None + + +def _object_uri(entry: dict[str, Any]) -> str: + return str(entry.get("url") or "").split("#", 1)[0] + + +def _object_updated_at(entry: dict[str, Any]) -> dt.datetime | None: + metadata = entry.get("metadata") or {} + return _parse_timestamp(metadata.get("updated") or metadata.get("timeCreated")) + + +def _report_errors(payload: dict[str, Any]) -> list[Any]: + errors = payload.get("errors") + if isinstance(errors, list) and errors: + return errors + error_summary = payload.get("error_summary") + if isinstance(error_summary, dict): + nested = error_summary.get("errors") + if isinstance(nested, list) and nested: + return nested + if payload.get("error"): + return [payload.get("error")] + return [] + + +def _report_status(payload: dict[str, Any]) -> tuple[str, str]: + status = str(payload.get("status") or payload.get("summary", {}).get("status") or "").strip() + stage = str(payload.get("stage") or payload.get("summary", {}).get("stage") or "").strip() + return status, stage + + +def _payload_service_name(payload: dict[str, Any]) -> str: + runtime_target = payload.get("runtime_target") + service = payload.get("service_name") + if not service and isinstance(runtime_target, dict): + service = runtime_target.get("service_name") + return str(service or "").strip() + + +def _payload_account_scope(payload: dict[str, Any]) -> str: + for key in ("account_scope", "account_group", "account_region"): + value = payload.get(key) + if value: + return str(value).strip() + return "" + + +def _payload_matches(payload: dict[str, Any], required_services: list[str]) -> tuple[bool, str, str]: + expected_platform = (os.environ.get("RUNTIME_HEARTBEAT_REPORT_PLATFORM") or "").strip().lower() + if expected_platform: + platform = str(payload.get("platform") or "").strip().lower() + if platform != expected_platform: + return False, "", f"platform={platform or '-'}" + + expected_scope = (os.environ.get("RUNTIME_HEARTBEAT_ACCOUNT_SCOPE") or "").strip().lower() + if expected_scope: + account_scope = _payload_account_scope(payload).lower() + if account_scope != expected_scope: + return False, "", f"account_scope={account_scope or '-'}" + + service_name = _payload_service_name(payload) + if required_services and service_name not in required_services: + return False, service_name, f"service_name={service_name or '-'}" + return True, service_name, "matched filters" + + +def _is_accepted_report(payload: dict[str, Any]) -> tuple[bool, str]: + accepted_statuses = { + value.lower() + for value in (_split_values(os.environ.get("RUNTIME_HEARTBEAT_ACCEPT_STATUSES")) or DEFAULT_ACCEPT_STATUSES) + } + reject_statuses = { + value.lower() + for value in (_split_values(os.environ.get("RUNTIME_HEARTBEAT_REJECT_STATUSES")) or DEFAULT_REJECT_STATUSES) + } + accepted_stages = { + value.upper() + for value in (_split_values(os.environ.get("RUNTIME_HEARTBEAT_ACCEPT_STAGES")) or DEFAULT_ACCEPT_STAGES) + } + reject_stages = { + value.upper() + for value in (_split_values(os.environ.get("RUNTIME_HEARTBEAT_REJECT_STAGES")) or DEFAULT_REJECT_STAGES) + } + allow_errors = _env_bool("RUNTIME_HEARTBEAT_ACCEPT_REPORTS_WITH_ERRORS", False) + + status, stage = _report_status(payload) + status_key = status.lower() + stage_key = stage.upper() + errors = _report_errors(payload) + if errors and not allow_errors: + return False, f"errors={len(errors)} status={status or '-'} stage={stage or '-'}" + if status_key in reject_statuses or stage_key in reject_stages: + return False, f"rejected status={status or '-'} stage={stage or '-'}" + if status_key and status_key in accepted_statuses: + return True, f"status={status}" + if stage_key and stage_key in accepted_stages: + return True, f"stage={stage}" + if not status_key and not stage_key and not errors and not _env_bool("RUNTIME_HEARTBEAT_REQUIRE_STATUS", False): + return True, "report exists" + return False, f"unaccepted status={status or '-'} stage={stage or '-'}" + + +def _send_telegram(message: str) -> bool: + targets: list[tuple[str, str]] = [] + crisis_token = os.environ.get("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + for chat_id in _split_values(os.environ.get("CRISIS_ALERT_TELEGRAM_CHAT_IDS")): + if crisis_token: + targets.append((crisis_token, chat_id)) + token = os.environ.get("TELEGRAM_TOKEN") or os.environ.get("TG_TOKEN") + for chat_id in _split_values(os.environ.get("GLOBAL_TELEGRAM_CHAT_ID")): + if token: + targets.append((token, chat_id)) + unique_targets = list(dict.fromkeys(targets)) + if not unique_targets: + print("No Telegram token/chat configured; unable to send heartbeat alert.", file=sys.stderr) + return False + base_url = os.environ.get("CRISIS_ALERT_TELEGRAM_API_BASE_URL") or "https://api.telegram.org" + ok = True + for token_value, chat_id in unique_targets: + body = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode() + request = urllib.request.Request( + f"{base_url.rstrip('/')}/bot{token_value}/sendMessage", + data=body, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + ok = ok and response.status < 400 + except Exception as exc: # noqa: BLE001 + ok = False + print(f"Telegram send failed: {exc}", file=sys.stderr) + return ok + + +def main() -> int: + project = ( + os.environ.get("RUNTIME_HEARTBEAT_GCP_PROJECT_ID") + or os.environ.get("GCP_PROJECT_ID") + or os.environ.get("GOOGLE_CLOUD_PROJECT") + ) + name = os.environ.get("RUNTIME_HEARTBEAT_NAME") or os.environ.get("GITHUB_REPOSITORY") or "runtime" + lookback_hours = float(os.environ.get("RUNTIME_HEARTBEAT_LOOKBACK_HOURS") or "36") + max_reports = int(os.environ.get("RUNTIME_HEARTBEAT_MAX_REPORTS_TO_READ") or "20") + fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True) + required_services = _load_required_services() + + now = dt.datetime.now(dt.timezone.utc) + since = now - dt.timedelta(hours=lookback_hours) + globs = _report_globs(since, now) + if not globs: + raise SystemExit("No heartbeat GCS report URI configured") + + objects: dict[str, tuple[str, dt.datetime]] = {} + list_errors = [] + for gcs_glob in globs: + try: + for entry in _list_gcs_objects(gcs_glob, project=project): + uri = _object_uri(entry) + updated = _object_updated_at(entry) + if uri and updated and updated >= since: + objects[uri] = (uri, updated) + except RuntimeError as exc: + list_errors.append(f"{gcs_glob}: {exc}") + + sorted_objects = sorted(objects.values(), key=lambda item: item[1], reverse=True) + accepted = [] + accepted_by_service: dict[str, tuple[str, dt.datetime, str]] = {} + inspected = [] + for uri, updated in sorted_objects[:max_reports]: + payload = _cat_gcs_json(uri, project=project) + if payload is None: + inspected.append(f"- {updated.isoformat()} {uri} unreadable") + continue + matches, service_name, filter_reason = _payload_matches(payload, required_services) + if not matches: + inspected.append(f"- {updated.isoformat()} {uri} skipped {filter_reason}") + continue + ok, reason = _is_accepted_report(payload) + inspected.append(f"- {updated.isoformat()} {uri} {reason}") + if ok: + if required_services: + accepted_by_service[service_name] = (uri, updated, reason) + else: + accepted.append((uri, updated, reason)) + + if required_services: + missing = [service for service in required_services if service not in accepted_by_service] + if not missing: + details = ", ".join( + f"{service}@{accepted_by_service[service][1].isoformat()}" for service in required_services + ) + print(f"Execution report heartbeat OK for {name}: {details}") + return 0 + if accepted: + uri, updated, reason = accepted[0] + print( + f"Execution report heartbeat OK for {name}: {reason}, updated={updated.isoformat()}, uri={uri}" + ) + return 0 + + issues = [] + if list_errors: + issues.extend(f"list failed: {item}" for item in list_errors[:3]) + if not sorted_objects: + issues.append(f"no report object updated in the last {lookback_hours:g} hours") + elif required_services: + missing = [service for service in required_services if service not in accepted_by_service] + issues.append(f"missing acceptable report for service(s): {', '.join(missing)}") + else: + issues.append(f"no acceptable report among {min(len(sorted_objects), max_reports)} recent report object(s)") + + run_url = "" + if os.environ.get("GITHUB_SERVER_URL") and os.environ.get("GITHUB_REPOSITORY") and os.environ.get("GITHUB_RUN_ID"): + run_url = ( + f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}" + f"/actions/runs/{os.environ['GITHUB_RUN_ID']}" + ) + message_lines = [ + f"[Execution Report Heartbeat] {name}", + f"Lookback: {lookback_hours:g} hours", + "Issues:", + *[f"- {issue}" for issue in issues], + ] + if inspected: + message_lines.extend(["Recent reports:", *inspected[:max_reports]]) + if run_url: + message_lines.append(f"Workflow: {run_url}") + message = "\n".join(message_lines) + print(message) + _send_telegram(message[:3900]) + return 1 if fail_workflow else 0 + + +if __name__ == "__main__": + raise SystemExit(main())