From 344e3ebc08a6e5adede196cc0d7e1dbd46ae68b5 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 2 Jun 2026 18:13:04 +0800 Subject: [PATCH] Add Cloud Run runtime guard alerts --- .github/workflows/runtime-guard.yml | 69 +++++++ README.md | 52 +++++ scripts/cloud_run_runtime_guard.py | 309 ++++++++++++++++++++++++++++ 3 files changed, 430 insertions(+) create mode 100644 .github/workflows/runtime-guard.yml create mode 100644 scripts/cloud_run_runtime_guard.py diff --git a/.github/workflows/runtime-guard.yml b/.github/workflows/runtime-guard.yml new file mode 100644 index 0000000..ed2a9ed --- /dev/null +++ b/.github/workflows/runtime-guard.yml @@ -0,0 +1,69 @@ +name: Runtime Guard + +on: + workflow_dispatch: + inputs: + lookback_minutes: + description: "Cloud Logging lookback window in minutes." + required: false + type: string + default: "180" + require_success: + description: "Alert if no successful Cloud Run request exists in the lookback window." + required: false + type: choice + default: "false" + options: + - "false" + - "true" + fail_workflow_on_alert: + description: "Fail this workflow when an alert is emitted." + required: false + type: choice + default: "true" + options: + - "true" + - "false" + schedule: + - cron: "17,47 * * * *" + +env: + GCP_PROJECT_ID: firstradequant + GCP_WORKLOAD_IDENTITY_PROVIDER: projects/1088907247379/locations/global/workloadIdentityPools/github-actions/providers/github-main + GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT: firstrade-platform-deploy@firstradequant.iam.gserviceaccount.com + +jobs: + guard: + name: Check Cloud Run runtime + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + env: + RUNTIME_GUARD_NAME: FirstradePlatform + RUNTIME_GUARD_CLOUD_RUN_SERVICES: ${{ vars.RUNTIME_GUARD_CLOUD_RUN_SERVICES }} + RUNTIME_GUARD_LOOKBACK_MINUTES: ${{ inputs.lookback_minutes || vars.RUNTIME_GUARD_LOOKBACK_MINUTES || '180' }} + RUNTIME_GUARD_REQUIRE_SUCCESS: ${{ inputs.require_success || vars.RUNTIME_GUARD_REQUIRE_SUCCESS || 'false' }} + RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT || 'true' }} + RUNTIME_GUARD_SCHEDULER_JOB_PATTERN: ${{ vars.RUNTIME_GUARD_SCHEDULER_JOB_PATTERN }} + CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} + GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }} + CRISIS_ALERT_TELEGRAM_CHAT_IDS: ${{ vars.CRISIS_ALERT_TELEGRAM_CHAT_IDS }} + CRISIS_ALERT_TELEGRAM_API_BASE_URL: ${{ vars.CRISIS_ALERT_TELEGRAM_API_BASE_URL }} + TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }} + CRISIS_ALERT_TELEGRAM_BOT_TOKEN: ${{ secrets.CRISIS_ALERT_TELEGRAM_BOT_TOKEN }} + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v3 + with: + workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }} + service_account: ${{ env.GCP_WORKLOAD_IDENTITY_SERVICE_ACCOUNT }} + + - name: Set up gcloud + uses: google-github-actions/setup-gcloud@v3 + + - name: Check Cloud Scheduler and Cloud Run logs + run: python scripts/cloud_run_runtime_guard.py diff --git a/README.md b/README.md index 420b3e8..f078c79 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,36 @@ The main-push flag is an explicit automation ownership switch. Setting it to `true` keeps the deployed US runtime aligned with the latest `main` version while the live-order gates above still control whether `/run` can submit real orders. +## Runtime Guard Alerting + +This repo also includes `.github/workflows/runtime-guard.yml`, a GitHub Actions +guard for failures that happen outside the Flask handler. It reads Cloud Logging +for recent Cloud Scheduler errors and Cloud Run request/runtime failures, then +sends Telegram directly through `CRISIS_ALERT_TELEGRAM_BOT_TOKEN` + +`CRISIS_ALERT_TELEGRAM_CHAT_IDS` or the fallback `TELEGRAM_TOKEN` + +`GLOBAL_TELEGRAM_CHAT_ID`. + +The guard does not call `/run`, `/session-check`, or any trading endpoint. It is +a second notification layer for cases where Cloud Scheduler cannot reach Cloud +Run, OIDC/IAM/audience is wrong, Cloud Run returns 4xx/5xx, or the container +fails before the app-level Telegram fallback can run. + +Required setup: + +- keep `CLOUD_RUN_SERVICE` or `RUNTIME_GUARD_CLOUD_RUN_SERVICES` set to the + deployed service name +- give the GitHub deploy service account `roles/logging.viewer` on the GCP + project so it can read Cloud Logging +- keep Telegram chat/token variables or secrets configured in GitHub +- optionally set `RUNTIME_GUARD_SCHEDULER_JOB_PATTERN` to a regex that limits + Scheduler log checks to this service's jobs + +The scheduled guard checks every 30 minutes. To use it as a missed-run heartbeat, +set `RUNTIME_GUARD_REQUIRE_SUCCESS=true` and choose +`RUNTIME_GUARD_LOOKBACK_MINUTES` so the window covers the expected Firstrade +Scheduler run. The default leaves that heartbeat check off to avoid false alerts +outside trading windows. + ## Cloud Run Shape `main.py` exposes: @@ -386,6 +416,28 @@ Cloud Run 的入口。如果希望 GitHub 接管已部署运行时,仓库级 V `true` 后,美股运行时会跟随最新 `main` 部署;是否允许 `/run` 提交真实订单仍由上面的 live-order 安全闸控制。 +### Runtime Guard 告警 + +仓库还提供 `.github/workflows/runtime-guard.yml`。这个 workflow 不会调用 +`/run`、`/session-check` 或任何交易入口,只读取 Cloud Logging 中最近的 Cloud +Scheduler 错误和 Cloud Run 请求/运行失败,并直接用 +`CRISIS_ALERT_TELEGRAM_BOT_TOKEN` + `CRISIS_ALERT_TELEGRAM_CHAT_IDS` 或 fallback +的 `TELEGRAM_TOKEN` + `GLOBAL_TELEGRAM_CHAT_ID` 发 Telegram。 + +这层保护覆盖 Flask handler 还没来得及发通知的场景,例如 Scheduler 没打到 Cloud +Run、OIDC/IAM/audience 配错、Cloud Run 返回 4xx/5xx,或容器启动/导入阶段已经失败。 + +需要的配置: + +- `CLOUD_RUN_SERVICE` 或 `RUNTIME_GUARD_CLOUD_RUN_SERVICES` 指向已部署服务 +- GitHub deploy service account 需要项目级 `roles/logging.viewer`,用于读取 Cloud Logging +- GitHub 中继续配置 Telegram chat/token 变量或 secrets +- 可选设置 `RUNTIME_GUARD_SCHEDULER_JOB_PATTERN`,用正则把 Scheduler 日志限制到本服务的 job + +默认计划每 30 分钟检查一次。若要把它作为 missed-run 心跳检查,设置 +`RUNTIME_GUARD_REQUIRE_SUCCESS=true`,并把 `RUNTIME_GUARD_LOOKBACK_MINUTES` 设成覆盖 +Firstrade 预期 Scheduler 运行时间的窗口。默认不强制心跳,避免非交易窗口误报。 + 请不要把 Firstrade 登录凭据、MFA secret、cookie 文件提交到 Git。`.env`、 `.runtime/` 和 `ft_cookies*.json` 已经在 `.gitignore` 中。 diff --git a/scripts/cloud_run_runtime_guard.py b/scripts/cloud_run_runtime_guard.py new file mode 100644 index 0000000..561a808 --- /dev/null +++ b/scripts/cloud_run_runtime_guard.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 +"""Check Cloud Scheduler and Cloud Run logs, then notify Telegram on failures.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import re +import subprocess +import sys +import urllib.parse +import urllib.request +from typing import Any + + +ERROR_SEVERITIES = {"ERROR", "CRITICAL", "ALERT", "EMERGENCY"} +FAILURE_WORDS = ( + "DEADLINE_EXCEEDED", + "INTERNAL_ERROR", + "PERMISSION_DENIED", + "UNAUTHENTICATED", + "URL_ERROR", + "URL_UNREACHABLE", +) + + +def _split_values(raw: str | None) -> list[str]: + if not raw: + return [] + return [part.strip() for part in re.split(r"[,;\n]+", raw) if part.strip()] + + +def _env_bool(name: str, default: bool = False) -> bool: + value = (os.environ.get(name) or "").strip().lower() + if not value: + return default + return value in {"1", "true", "yes", "y", "on"} + + +def _load_services() -> list[str]: + services = [] + for name in ( + "RUNTIME_GUARD_CLOUD_RUN_SERVICES", + "CLOUD_RUN_SERVICES", + "CLOUD_RUN_SERVICE", + ): + services.extend(_split_values(os.environ.get(name))) + + raw_targets = (os.environ.get("CLOUD_RUN_SERVICE_TARGETS_JSON") or "").strip() + if raw_targets: + try: + payload = json.loads(raw_targets) + targets = payload.get("targets") if isinstance(payload, dict) else payload + if isinstance(targets, list): + for target in targets: + if not isinstance(target, dict): + continue + runtime_target = target.get("runtime_target") or target.get( + "runtime_target_json" + ) + if isinstance(runtime_target, str): + try: + runtime_target = json.loads(runtime_target) + except json.JSONDecodeError: + runtime_target = {} + for key in ("service", "service_name", "cloud_run_service"): + value = target.get(key) or ( + runtime_target.get(key) + if isinstance(runtime_target, dict) + else None + ) + if value: + services.extend(_split_values(str(value))) + break + except json.JSONDecodeError as exc: + raise RuntimeError(f"CLOUD_RUN_SERVICE_TARGETS_JSON is invalid: {exc}") from exc + + seen = set() + unique = [] + for service in services: + if service not in seen: + seen.add(service) + unique.append(service) + return unique + + +def _run_gcloud_logging(project: str, log_filter: str, limit: int) -> list[dict[str, Any]]: + command = [ + "gcloud", + "logging", + "read", + log_filter, + "--project", + project, + "--format=json", + f"--limit={limit}", + ] + result = subprocess.run(command, text=True, capture_output=True, check=False) + if result.returncode != 0: + detail = (result.stderr or result.stdout or "").strip() + raise RuntimeError(detail or "gcloud logging read failed") + if not result.stdout.strip(): + return [] + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gcloud returned invalid JSON: {exc}") from exc + return payload if isinstance(payload, list) else [] + + +def _status(entry: dict[str, Any]) -> int | None: + value = (entry.get("httpRequest") or {}).get("status") + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _entry_text(entry: dict[str, Any]) -> str: + chunks = [] + for key in ("textPayload", "message"): + value = entry.get(key) + if value: + chunks.append(str(value)) + for key in ("jsonPayload", "protoPayload"): + value = entry.get(key) + if value: + chunks.append(json.dumps(value, sort_keys=True)) + return " ".join(chunks) + + +def _is_failure(entry: dict[str, Any]) -> bool: + severity = str(entry.get("severity") or "").upper() + status = _status(entry) + text = _entry_text(entry).upper() + return ( + severity in ERROR_SEVERITIES + or (status is not None and status >= 400) + or any(word in text for word in FAILURE_WORDS) + ) + + +def _is_success(entry: dict[str, Any]) -> bool: + status = _status(entry) + return status is not None and 200 <= status < 400 + + +def _labels(entry: dict[str, Any]) -> dict[str, Any]: + resource = entry.get("resource") or {} + labels = resource.get("labels") or {} + return labels if isinstance(labels, dict) else {} + + +def _summarize(entry: dict[str, Any]) -> str: + labels = _labels(entry) + target = labels.get("service_name") or labels.get("job_id") or labels.get("job_name") + timestamp = str(entry.get("timestamp") or "-") + severity = str(entry.get("severity") or "-") + status = _status(entry) + status_text = f" status={status}" if status is not None else "" + text = re.sub(r"\s+", " ", _entry_text(entry)).strip() + if len(text) > 180: + text = text[:177] + "..." + suffix = f" {text}" if text else "" + return f"- {timestamp} {target or ''} severity={severity}{status_text}{suffix}" + + +def _send_telegram(message: str) -> bool: + targets: list[tuple[str, str]] = [] + crisis_token = os.environ.get("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + for chat_id in _split_values(os.environ.get("CRISIS_ALERT_TELEGRAM_CHAT_IDS")): + if crisis_token: + targets.append((crisis_token, chat_id)) + + token = os.environ.get("TELEGRAM_TOKEN") or os.environ.get("TG_TOKEN") + for chat_id in _split_values(os.environ.get("GLOBAL_TELEGRAM_CHAT_ID")): + if token: + targets.append((token, chat_id)) + + unique_targets = list(dict.fromkeys(targets)) + if not unique_targets: + print("No Telegram token/chat configured; unable to send runtime guard alert.", file=sys.stderr) + return False + + ok = True + base_url = os.environ.get("CRISIS_ALERT_TELEGRAM_API_BASE_URL") or "https://api.telegram.org" + for token_value, chat_id in unique_targets: + body = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode() + request = urllib.request.Request( + f"{base_url.rstrip('/')}/bot{token_value}/sendMessage", + data=body, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=15) as response: + if response.status >= 400: + ok = False + print(f"Telegram returned HTTP {response.status}", file=sys.stderr) + except Exception as exc: # noqa: BLE001 + ok = False + print(f"Telegram send failed: {exc}", file=sys.stderr) + return ok + + +def main() -> int: + project = ( + os.environ.get("RUNTIME_GUARD_GCP_PROJECT_ID") + or os.environ.get("GCP_PROJECT_ID") + or os.environ.get("GOOGLE_CLOUD_PROJECT") + ) + if not project: + raise SystemExit("GCP_PROJECT_ID or GOOGLE_CLOUD_PROJECT is required") + + name = os.environ.get("RUNTIME_GUARD_NAME") or os.environ.get("GITHUB_REPOSITORY") or "Cloud Run" + lookback_minutes = int(os.environ.get("RUNTIME_GUARD_LOOKBACK_MINUTES") or "180") + limit = int(os.environ.get("RUNTIME_GUARD_LOG_LIMIT") or "200") + require_success = _env_bool("RUNTIME_GUARD_REQUIRE_SUCCESS", False) + fail_workflow = _env_bool("RUNTIME_GUARD_FAIL_WORKFLOW_ON_ALERT", True) + check_scheduler = _env_bool("RUNTIME_GUARD_CHECK_SCHEDULER", True) + scheduler_pattern = os.environ.get("RUNTIME_GUARD_SCHEDULER_JOB_PATTERN") or "" + + since = ( + dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=lookback_minutes) + ).replace(microsecond=0) + since_text = since.isoformat().replace("+00:00", "Z") + + issues: list[str] = [] + details: list[str] = [] + success_count = 0 + + try: + services = _load_services() + except RuntimeError as exc: + services = [] + issues.append(f"service configuration error: {exc}") + + for service in services: + log_filter = ( + 'resource.type="cloud_run_revision" ' + f'AND resource.labels.service_name="{service}" ' + f'AND timestamp >= "{since_text}"' + ) + try: + entries = _run_gcloud_logging(project, log_filter, limit) + except RuntimeError as exc: + issues.append(f"Cloud Run log query failed for {service}: {exc}") + continue + failures = [entry for entry in entries if _is_failure(entry)] + success_count += sum(1 for entry in entries if _is_success(entry)) + if failures: + issues.append(f"{len(failures)} Cloud Run failure log(s) for {service}") + details.extend(_summarize(entry) for entry in failures[:5]) + + if services and require_success and success_count == 0: + issues.append( + f"no successful Cloud Run request found for {', '.join(services)} in the last {lookback_minutes} minutes" + ) + + if check_scheduler: + log_filter = f'resource.type="cloud_scheduler_job" AND timestamp >= "{since_text}"' + try: + entries = _run_gcloud_logging(project, log_filter, limit) + if scheduler_pattern: + regex = re.compile(scheduler_pattern) + entries = [ + entry + for entry in entries + if regex.search(str(_labels(entry).get("job_id") or _labels(entry).get("job_name") or "")) + ] + failures = [entry for entry in entries if _is_failure(entry)] + if failures: + issues.append(f"{len(failures)} Cloud Scheduler failure log(s)") + details.extend(_summarize(entry) for entry in failures[:5]) + except RuntimeError as exc: + issues.append(f"Cloud Scheduler log query failed: {exc}") + + if not issues: + service_text = ", ".join(services) if services else "" + print( + f"Runtime guard OK for {name}: services={service_text}, lookback={lookback_minutes}m, successes={success_count}" + ) + return 0 + + run_url = "" + if os.environ.get("GITHUB_SERVER_URL") and os.environ.get("GITHUB_REPOSITORY") and os.environ.get("GITHUB_RUN_ID"): + run_url = ( + f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}" + f"/actions/runs/{os.environ['GITHUB_RUN_ID']}" + ) + message_lines = [ + f"[Runtime Guard] {name}", + f"Project: {project}", + f"Lookback: {lookback_minutes} minutes", + "Issues:", + *[f"- {issue}" for issue in issues], + ] + if details: + message_lines.extend(["Details:", *details[:10]]) + if run_url: + message_lines.append(f"Workflow: {run_url}") + message = "\n".join(message_lines) + print(message) + _send_telegram(message[:3900]) + return 1 if fail_workflow else 0 + + +if __name__ == "__main__": + raise SystemExit(main())