Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 98 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from __future__ import annotations

import os
import traceback
from datetime import datetime, timezone

from flask import Flask, jsonify
from flask import Flask, jsonify, request

from application.firstrade_client import (
FirstradeBrokerClient,
Expand All @@ -16,6 +17,7 @@
)
from application.rebalance_service import run_strategy_cycle
from application.session_check_service import run_session_check
from notifications.telegram import build_sender
from strategy_registry import get_platform_profile_status_matrix

app = Flask(__name__)
Expand All @@ -25,6 +27,79 @@ def _flag(name: str, default: str = "false") -> bool:
return (os.getenv(name, default) or "").strip().lower() == "true"


def _split_env_list(value: str | None) -> tuple[str, ...]:
return tuple(
item.strip()
for item in str(value or "").replace(";", ",").split(",")
if item.strip()
)


def _telegram_notification_targets() -> tuple[tuple[str, str], ...]:
targets: list[tuple[str, str]] = []
main_token = os.getenv("TELEGRAM_TOKEN")
main_chat_id = os.getenv("GLOBAL_TELEGRAM_CHAT_ID")
if main_token and main_chat_id:
targets.append((main_token, main_chat_id))

crisis_token = os.getenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN")
for chat_id in _split_env_list(os.getenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS")):
if crisis_token and chat_id:
targets.append((crisis_token, chat_id))
Comment on lines +45 to +48

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Respect disabled crisis Telegram channels

When a deployment leaves CRISIS_ALERT_TELEGRAM_* configured but sets CRISIS_ALERT_CHANNELS to exclude telegram (the documented/runtime channel list is the enablement control for crisis alerts), this branch still adds those chat IDs and sends every /run fatal-error alert to Telegram. Gate these crisis targets on the channel list including telegram so disabled or staged Telegram credentials do not receive runtime-error notifications.

Useful? React with 👍 / 👎.


seen: set[tuple[str, str]] = set()
unique_targets: list[tuple[str, str]] = []
for target in targets:
if target in seen:
continue
seen.add(target)
unique_targets.append(target)
return tuple(unique_targets)


def _runtime_error_notification_message(exc: Exception) -> str:
error_text = f"{type(exc).__name__}: {exc}"
if len(error_text) > 1200:
error_text = error_text[:1197] + "..."
return "\n".join(
(
"Firstrade strategy run failed",
f"service: {os.getenv('K_SERVICE') or 'firstrade-quant-service'}",
f"revision: {os.getenv('K_REVISION') or '<unknown>'}",
f"route: {request.method} {request.path}",
f"strategy: {os.getenv('STRATEGY_PROFILE') or '<unset>'}",
f"account_scope: {os.getenv('ACCOUNT_REGION') or '<unset>'}",
f"error: {error_text}",
)
)


def _notify_runtime_error(exc: Exception) -> bool:
targets = _telegram_notification_targets()
if not targets:
print(
"Firstrade runtime error notification skipped: no Telegram target configured.",
flush=True,
)
return False

message = _runtime_error_notification_message(exc)
attempted = False
for token, chat_id in targets:
attempted = True
try:
build_sender(token, chat_id)(message)
except Exception as send_exc: # pragma: no cover - build_sender normally handles this.
print(f"Firstrade runtime error Telegram send failed: {send_exc}", flush=True)
return attempted


def _handle_strategy_run_exception(exc: Exception) -> bool:
print(f"Firstrade strategy run failed: {type(exc).__name__}: {exc}", flush=True)
traceback.print_exc()
return _notify_runtime_error(exc)


@app.get("/")
def health():
return jsonify(
Expand Down Expand Up @@ -128,9 +203,29 @@ def run_strategy():
try:
return jsonify(run_strategy_cycle())
except (FirstradePlatformError, EnvironmentError, ValueError) as exc:
return jsonify({"ok": False, "error": str(exc)}), 500
notification_attempted = _handle_strategy_run_exception(exc)
return (
jsonify(
{
"ok": False,
"error": str(exc),
"runtime_error_notification_attempted": notification_attempted,
}
),
500,
)
except Exception as exc:
return jsonify({"ok": False, "error": f"{type(exc).__name__}: {exc}"}), 500
notification_attempted = _handle_strategy_run_exception(exc)
return (
jsonify(
{
"ok": False,
"error": f"{type(exc).__name__}: {exc}",
"runtime_error_notification_attempted": notification_attempted,
}
),
500,
)


@app.post("/precheck")
Expand Down
58 changes: 58 additions & 0 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,64 @@ def test_root_post_calls_strategy_cycle_when_gate_enabled(monkeypatch):
assert response.get_json() == {"ok": True, "action_done": False}


def test_run_endpoint_notifies_telegram_on_strategy_cycle_error(monkeypatch):
sent_messages = []

def fake_build_sender(token, chat_id):
def send(message):
sent_messages.append((token, chat_id, message))

return send

monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true")
monkeypatch.setenv("TELEGRAM_TOKEN", "token-1")
monkeypatch.setenv("GLOBAL_TELEGRAM_CHAT_ID", "chat-1")
monkeypatch.setenv("STRATEGY_PROFILE", "mega_cap_leader_rotation_top50_balanced")
monkeypatch.setattr(main, "build_sender", fake_build_sender)
monkeypatch.setattr(
main,
"run_strategy_cycle",
lambda: (_ for _ in ()).throw(ValueError("snapshot denied")),
)
client = main.app.test_client()

response = client.post("/run")

assert response.status_code == 500
payload = response.get_json()
assert payload["ok"] is False
assert payload["error"] == "snapshot denied"
assert payload["runtime_error_notification_attempted"] is True
assert len(sent_messages) == 1
assert sent_messages[0][0] == "token-1"
assert sent_messages[0][1] == "chat-1"
assert "Firstrade strategy run failed" in sent_messages[0][2]
assert "ValueError: snapshot denied" in sent_messages[0][2]
assert "strategy: mega_cap_leader_rotation_top50_balanced" in sent_messages[0][2]


def test_run_endpoint_error_does_not_require_telegram_config(monkeypatch):
monkeypatch.setenv("FIRSTRADE_RUN_STRATEGY_ON_HTTP", "true")
monkeypatch.delenv("TELEGRAM_TOKEN", raising=False)
monkeypatch.delenv("GLOBAL_TELEGRAM_CHAT_ID", raising=False)
monkeypatch.delenv("CRISIS_ALERT_TELEGRAM_BOT_TOKEN", raising=False)
monkeypatch.delenv("CRISIS_ALERT_TELEGRAM_CHAT_IDS", raising=False)
monkeypatch.setattr(
main,
"run_strategy_cycle",
lambda: (_ for _ in ()).throw(RuntimeError("boom")),
)
client = main.app.test_client()

response = client.post("/run")

assert response.status_code == 500
payload = response.get_json()
assert payload["ok"] is False
assert payload["error"] == "RuntimeError: boom"
assert payload["runtime_error_notification_attempted"] is False


def test_scheduler_health_routes_accept_post():
client = main.app.test_client()

Expand Down