From d8f6a83a537de2e85a84711e2cad45994f95c7d5 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Tue, 2 Jun 2026 16:26:00 +0800 Subject: [PATCH] Notify Telegram on Firstrade runtime errors --- main.py | 101 ++++++++++++++++++++++++++++++++- tests/test_request_handling.py | 58 +++++++++++++++++++ 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/main.py b/main.py index 9a5686c..d4bc9b4 100644 --- a/main.py +++ b/main.py @@ -3,9 +3,10 @@ from __future__ import annotations import os +import traceback from datetime import datetime, timezone -from flask import Flask, jsonify +from flask import Flask, jsonify, request from application.firstrade_client import ( FirstradeBrokerClient, @@ -16,6 +17,7 @@ ) from application.rebalance_service import run_strategy_cycle from application.session_check_service import run_session_check +from notifications.telegram import build_sender from strategy_registry import get_platform_profile_status_matrix app = Flask(__name__) @@ -25,6 +27,79 @@ def _flag(name: str, default: str = "false") -> bool: return (os.getenv(name, default) or "").strip().lower() == "true" +def _split_env_list(value: str | None) -> tuple[str, ...]: + return tuple( + item.strip() + for item in str(value or "").replace(";", ",").split(",") + if item.strip() + ) + + +def _telegram_notification_targets() -> tuple[tuple[str, str], ...]: + targets: list[tuple[str, str]] = [] + main_token = os.getenv("TELEGRAM_TOKEN") + main_chat_id = os.getenv("GLOBAL_TELEGRAM_CHAT_ID") + if main_token and main_chat_id: + targets.append((main_token, main_chat_id)) + + crisis_token = os.getenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN") + for chat_id in _split_env_list(os.getenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS")): + if crisis_token and chat_id: + targets.append((crisis_token, chat_id)) + + seen: set[tuple[str, str]] = set() + unique_targets: list[tuple[str, str]] = [] + for target in targets: + if target in seen: + continue + seen.add(target) + unique_targets.append(target) + return tuple(unique_targets) + + +def _runtime_error_notification_message(exc: Exception) -> str: + error_text = f"{type(exc).__name__}: {exc}" + if len(error_text) > 1200: + error_text = error_text[:1197] + "..." + return "\n".join( + ( + "Firstrade strategy run failed", + f"service: {os.getenv('K_SERVICE') or 'firstrade-quant-service'}", + f"revision: {os.getenv('K_REVISION') or ''}", + f"route: {request.method} {request.path}", + f"strategy: {os.getenv('STRATEGY_PROFILE') or ''}", + f"account_scope: {os.getenv('ACCOUNT_REGION') or ''}", + f"error: {error_text}", + ) + ) + + +def _notify_runtime_error(exc: Exception) -> bool: + targets = _telegram_notification_targets() + if not targets: + print( + "Firstrade runtime error notification skipped: no Telegram target configured.", + flush=True, + ) + return False + + message = _runtime_error_notification_message(exc) + attempted = False + for token, chat_id in targets: + attempted = True + try: + build_sender(token, chat_id)(message) + except Exception as send_exc: # pragma: no cover - build_sender normally handles this. + print(f"Firstrade runtime error Telegram send failed: {send_exc}", flush=True) + return attempted + + +def _handle_strategy_run_exception(exc: Exception) -> bool: + print(f"Firstrade strategy run failed: {type(exc).__name__}: {exc}", flush=True) + traceback.print_exc() + return _notify_runtime_error(exc) + + @app.get("/") def health(): return jsonify( @@ -128,9 +203,29 @@ def run_strategy(): try: return jsonify(run_strategy_cycle()) except (FirstradePlatformError, EnvironmentError, ValueError) as exc: - return jsonify({"ok": False, "error": str(exc)}), 500 + notification_attempted = _handle_strategy_run_exception(exc) + return ( + jsonify( + { + "ok": False, + "error": str(exc), + "runtime_error_notification_attempted": notification_attempted, + } + ), + 500, + ) except Exception as exc: - return jsonify({"ok": False, "error": f"{type(exc).__name__}: {exc}"}), 500 + notification_attempted = _handle_strategy_run_exception(exc) + return ( + jsonify( + { + "ok": False, + "error": f"{type(exc).__name__}: {exc}", + "runtime_error_notification_attempted": notification_attempted, + } + ), + 500, + ) @app.post("/precheck") diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 4a9dc6b..33a008e 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -68,6 +68,64 @@ def test_root_post_calls_strategy_cycle_when_gate_enabled(monkeypatch): assert response.get_json() == {"ok": True, "action_done": False} +def test_run_endpoint_notifies_telegram_on_strategy_cycle_error(monkeypatch): + sent_messages = [] + + def fake_build_sender(token, chat_id): + def send(message): + sent_messages.append((token, chat_id, message)) + + return send + + monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true") + monkeypatch.setenv("TELEGRAM_TOKEN", "token-1") + monkeypatch.setenv("GLOBAL_TELEGRAM_CHAT_ID", "chat-1") + monkeypatch.setenv("STRATEGY_PROFILE", "mega_cap_leader_rotation_top50_balanced") + monkeypatch.setattr(main, "build_sender", fake_build_sender) + monkeypatch.setattr( + main, + "run_strategy_cycle", + lambda: (_ for _ in ()).throw(ValueError("snapshot denied")), + ) + client = main.app.test_client() + + response = client.post("/run") + + assert response.status_code == 500 + payload = response.get_json() + assert payload["ok"] is False + assert payload["error"] == "snapshot denied" + assert payload["runtime_error_notification_attempted"] is True + assert len(sent_messages) == 1 + assert sent_messages[0][0] == "token-1" + assert sent_messages[0][1] == "chat-1" + assert "Firstrade strategy run failed" in sent_messages[0][2] + assert "ValueError: snapshot denied" in sent_messages[0][2] + assert "strategy: mega_cap_leader_rotation_top50_balanced" in sent_messages[0][2] + + +def test_run_endpoint_error_does_not_require_telegram_config(monkeypatch): + monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true") + monkeypatch.delenv("TELEGRAM_TOKEN", raising=False) + monkeypatch.delenv("GLOBAL_TELEGRAM_CHAT_ID", raising=False) + monkeypatch.delenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN", raising=False) + monkeypatch.delenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS", raising=False) + monkeypatch.setattr( + main, + "run_strategy_cycle", + lambda: (_ for _ in ()).throw(RuntimeError("boom")), + ) + client = main.app.test_client() + + response = client.post("/run") + + assert response.status_code == 500 + payload = response.get_json() + assert payload["ok"] is False + assert payload["error"] == "RuntimeError: boom" + assert payload["runtime_error_notification_attempted"] is False + + def test_scheduler_health_routes_accept_post(): client = main.app.test_client()