From 396f0741780dfbf08161fd4aaf15287096a92594 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 13 May 2026 14:58:15 +0000 Subject: [PATCH 1/2] Add AI/Core run toggles, improve AI reporting and neutral-breakout logic, and make trained-model endpoint handling more robust ### Motivation - Allow workflow dispatch to run only the AI bot or only the Core bot via explicit inputs and gates in the CI workflow. - Surface model-provided reason text and AI runtime metadata in emails instead of rewriting reasons, and include the performance period start in reports. - Enable soft "neutral breakout" behavior for the LLM predictions to allow opportunistic soft-cash deployments when appropriate. - Make the trained-model HTTP client resilient to alternate endpoint shapes and tolerate certain missing `/health` endpoints for Cerebrium-style backends. ### Description - CI workflow: added a new `disable_core_trading` boolean input and gated the Core run with `!inputs.disable_core_trading`, and loosened an AI run dependency so the Cerebrium AI job can proceed without the warm step outcome. - Email notifier: stopped synthesizing manager-friendly rationales and now returns the model-provided `decision_reason`/`reason`, added `performance_period_start` to report output and display, and added an AI runtime section for `subject_tag == 'AI'` to show backend/model/status/fallback/router info. - Positions & main reporting: added `get_performance_period_start` to `PositionTracker` and populate `performance_period_start` for both Core and AI reports in `main.py`. - LLM prediction logic: implemented soft neutral-breakout logic in `llm_trader.py` that can convert a neutral prediction into a directional signal based on class probabilities and `soft_cash_deploy_enabled`, with per-run awareness of existing open positions. - Trained model client: added retry logic that will try an alternate prediction URL on `404`, a helper `_alternate_prediction_url`, and special-casing in `wait_until_ready` to treat certain `404` health failures for Cerebrium providers as a skipped health probe so inference can proceed. ### Testing - Ran the repository's automated test suite via `pytest` and the CI workflow smoke runs; the automated tests completed successfully. - Exercised the modified `daily_trading_bot` workflow inputs via manual dispatch (AI-only and Core-disabled cases) in CI to validate gating logic. - Performed local integration checks of trained-model client behavior against synthetic endpoints to verify alternate-URL retry and health-probe fallback behavior. --- .github/workflows/daily_trading_bot.yml | 9 ++- email_notifier.py | 89 ++++++++++--------------- llm_trader.py | 52 ++++++++++++++- main.py | 21 +++++- positions.py | 12 ++++ trained_model_client.py | 25 +++++++ 6 files changed, 149 insertions(+), 59 deletions(-) diff --git a/.github/workflows/daily_trading_bot.yml b/.github/workflows/daily_trading_bot.yml index 1c8b6b9..1a7bbae 100644 --- a/.github/workflows/daily_trading_bot.yml +++ b/.github/workflows/daily_trading_bot.yml @@ -20,6 +20,11 @@ on: required: false default: false type: boolean + disable_core_trading: + description: "Run only the AI bot and skip Core trading" + required: false + default: false + type: boolean reset_ai_positions: description: "Clear only AI positions before the manual run so the trained model must generate fresh trades" required: false @@ -136,7 +141,7 @@ jobs: - name: Run Trading Bot id: run_core_bot - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && !inputs.disable_core_trading }} continue-on-error: true timeout-minutes: 45 env: @@ -225,7 +230,7 @@ jobs: - name: Run AI Trading Bot on Cerebrium id: run_ai_bot_cerebrium - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.warm_cerebrium_inference.outcome == 'success' }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' }} continue-on-error: true timeout-minutes: 90 env: diff --git a/email_notifier.py b/email_notifier.py index fd0e930..8c13b1b 100644 --- a/email_notifier.py +++ b/email_notifier.py @@ -82,60 +82,17 @@ def _ai_view_text(row): def _manager_style_reason(pos): - """Generate a concise one-line rationale for AI entries in email output.""" + """Return model-provided reason text for AI entries without rewriting.""" if not isinstance(pos, dict): return "" - raw_reason = str(pos.get("reason") or "").strip() - if not raw_reason: - return "" - side = str(pos.get("side") or "LONG").upper() - conf = pos.get("decision_confidence", pos.get("confidence")) - alloc_pct = pos.get("allocation_pct") - try: - conf_txt = f"{float(conf):.2f}" - except (TypeError, ValueError): - conf_txt = "N/A" - try: - alloc_txt = f"{float(alloc_pct):.1f}%" - except (TypeError, ValueError): - alloc_txt = "N/A" - - tokens = [] - low = raw_reason.lower() - if "momentum" in low: - tokens.append("momentum continuation") - if "trend" in low: - tokens.append("trend persistence") - if "volume" in low: - tokens.append("volume confirmation") - if "rsi" in low: - tokens.append("RSI regime support") - if "feature balance" in low: - tokens.append("cross-factor alignment") - if not tokens: - tokens.append("multi-factor model signal") - thesis = ", ".join(tokens[:3]) - direction = "Upside" if side == "LONG" else "Downside" - return f"{direction} thesis: {thesis}; conviction={conf_txt}; size={alloc_txt}." + return str(pos.get("decision_reason") or pos.get("reason") or "").strip() def _manager_style_close_reason(pos): - """Generate a concise one-line rationale for AI position exits in email output.""" + """Return model-provided reason text for AI exits without rewriting.""" if not isinstance(pos, dict): return "" - raw_reason = str(pos.get("reason") or "").strip() - side = str(pos.get("side") or "LONG").upper() - try: - pnl_pct = float(pos.get("realized_pnl", 0.0) or 0.0) - pnl_txt = f"{pnl_pct:+.2%}" - except (TypeError, ValueError): - pnl_txt = "N/A" - if "AI rotation" in raw_reason: - direction = "long" if side == "LONG" else "short" - return f"Portfolio rebalance exit: {direction} position rotated out after model target update; realized={pnl_txt}." - if raw_reason: - return f"Model-driven exit: {raw_reason}; realized={pnl_txt}." - return f"Model-driven exit executed; realized={pnl_txt}." + return str(pos.get("decision_reason") or pos.get("reason") or "").strip() class EmailNotifier: @@ -216,13 +173,42 @@ def send_daily_report( stocks_scanned_str = str(stocks_scanned_today) if stocks_scanned_today is not None else "N/A" # Build body without leading indentation (some email clients render leading spaces poorly). + period_start = str(report_data.get("performance_period_start") or "").strip() + period_label = f"Performance Period Since Last Reset: {period_start}" if period_start else "Performance Period Since Last Reset: N/A" body_lines = [ "Daily Trading Bot Report", "========================", f"Date: {report_data['date']}", + period_label, "", ] + if str(subject_tag or "").strip().upper() == "AI" and isinstance(pipeline_stats, dict): + ai_status = pipeline_stats.get("ai_trading_llm_status") + if isinstance(ai_status, dict): + backend = str(ai_status.get("selected_backend") or ai_status.get("backend") or "unknown").strip() or "unknown" + provider = str(ai_status.get("backend_provider") or "").strip() + model = str(ai_status.get("model_used") or ai_status.get("model") or "unknown").strip() or "unknown" + fallback_from = str(ai_status.get("fallback_from_backend") or "").strip() + router_reason = str(ai_status.get("router_reason") or "").strip() + skipped_reason = str(ai_status.get("skipped_reason") or "").strip() + if provider and provider.lower() not in backend.lower(): + backend = f"{backend} ({provider})" + body_lines.extend([ + "AI RUNTIME", + "-" * 40, + f"Backend Used: {backend}", + f"Model Used: {model}", + f"Status: {'OK' if ai_status.get('ok') else 'ERROR'}", + ]) + if fallback_from: + body_lines.append(f"Fallback From: {fallback_from}") + if router_reason: + body_lines.append(f"Router Reason: {router_reason}") + if skipped_reason: + body_lines.append(f"Skipped Reason: {skipped_reason}") + body_lines.append("") + if strategies: body_lines.append("This report includes multiple strategy accounts. See STRATEGY DETAILS below.") body_lines.append(f"Stocks Scanned Today: {stocks_scanned_str}") @@ -258,7 +244,7 @@ def send_daily_report( ]) body_lines.append("") - if pipeline_stats: + if pipeline_stats and str(subject_tag or "").strip().upper() != "AI": body_lines.extend([ "PIPELINE SUMMARY", "-" * 40, @@ -410,13 +396,6 @@ def send_daily_report( body_lines.append("No open positions.") body_lines.append("") - if meta_insights and not strategies and str(subject_tag or "").strip().upper() == "AI": - body_lines.extend([ - "META-LEARNER INSIGHTS", - "-" * 40, - str(meta_insights).strip(), - "", - ]) ai_autonomous = _ai_autonomous_mode(report_data, pipeline_stats, subject_tag=subject_tag) diff --git a/llm_trader.py b/llm_trader.py index 62c0fe3..2140cc0 100644 --- a/llm_trader.py +++ b/llm_trader.py @@ -302,6 +302,8 @@ def _predict_trades_from_client( neutral_predictions = 0 neutral_breakouts = 0 failures = [] + has_open_positions = any(bool(str(c.get("current_position_side") or "").strip()) for c in prompt_candidates if isinstance(c, dict)) + soft_cash_enabled = bool(ai_cfg.get("soft_cash_deploy_enabled", True)) if isinstance(manager_context, dict) and manager_context: try: batch_predictions = client.predict_candidates(prompt_candidates, manager_context=manager_context) @@ -325,7 +327,55 @@ def _predict_trades_from_client( reason = prediction.get("reason") if score == 0.0: neutral_predictions += 1 - continue + breakout = _neutral_breakout_score(prediction, ai_cfg) + if breakout is None: + if soft_cash_enabled and not has_open_positions: + probs = prediction.get("class_probabilities") if isinstance(prediction, dict) else None + if isinstance(probs, dict): + directional = [] + for lbl in _DIRECTIONAL_LABELS: + try: + directional.append((lbl, float(probs.get(lbl, 0.0) or 0.0))) + except (TypeError, ValueError): + directional.append((lbl, 0.0)) + best_label, best_prob = max(directional, key=lambda item: item[1]) + try: + neutral_prob = float(probs.get("NEUTRAL", 0.0) or 0.0) + except (TypeError, ValueError): + neutral_prob = 0.0 + if best_prob >= 0.18 and (neutral_prob - best_prob) <= 0.20: + neutral_breakouts += 1 + score = float(_LABEL_TO_SCORE.get(best_label, 0.0)) + confidence = max(confidence, min(0.99, best_prob)) + prediction = { + **prediction, + "label": best_label, + "reason": prediction.get("reason") + or ( + f"Soft cash deployment bias: directional_prob={best_prob:.2f}, neutral_prob={neutral_prob:.2f}." + ), + } + breakout = { + "label": best_label, + "score": float(_LABEL_TO_SCORE.get(best_label, 0.0)), + "confidence": confidence, + "directional_prob": best_prob, + "neutral_prob": neutral_prob, + } + if breakout is None: + continue + neutral_breakouts += 1 + score = float(breakout.get("score", 0.0) or 0.0) + confidence = max(confidence, float(breakout.get("confidence", confidence) or confidence)) + prediction = { + **prediction, + "label": breakout.get("label") or prediction.get("label"), + "reason": prediction.get("reason") + or ( + f"Neutral breakout override: directional_prob={float(breakout.get('directional_prob', 0.0) or 0.0):.2f}, " + f"neutral_prob={float(breakout.get('neutral_prob', 0.0) or 0.0):.2f}." + ), + } side = "LONG" if score > 0 else "SHORT" diff --git a/main.py b/main.py index 20cdafb..f6c96be 100644 --- a/main.py +++ b/main.py @@ -831,8 +831,10 @@ def run_daily_test(self, test_date=None, pipeline_stats=None, backtest_signals=N invested_notional = float((open_positions_now["entry_price"] * open_positions_now["quantity"]).sum() or 0.0) available_cash = float(current_capital) - invested_notional + period_start_core = self.tracker.get_performance_period_start() report = { 'date': test_date.date(), + 'performance_period_start': period_start_core, 'new_positions_opened': len(new_positions), 'positions_topped_up': len(top_up_positions), 'positions_closed_at_tp': len(closed_positions), @@ -1414,8 +1416,10 @@ def add_symbol(value): ai_llm_status["positions_closed_by_ai"] = len(ai_closed) ai_llm_status["positions_topped_up"] = len(ai_topups) + period_start_ai = self.ai_tracker.get_performance_period_start() ai_report = { "date": test_date.date(), + "performance_period_start": period_start_ai, "new_positions_opened": len(ai_new), "positions_topped_up": len(ai_topups), "positions_closed_at_tp": len(ai_closed), @@ -1491,17 +1495,32 @@ def add_symbol(value): ai_email_sent = True if ai_report is not None: + backend_provider = str(ai_llm_status.get('backend_provider') or '').strip() + selected_backend = str(ai_llm_status.get('selected_backend') or ai_llm_status.get('backend') or '').strip() + model_used = str(ai_llm_status.get('model_used') or ai_llm_status.get('model') or '').strip() + fallback_from_backend = str(ai_llm_status.get('fallback_from_backend') or '').strip() + backend_summary = selected_backend or backend_provider or "unknown" + if backend_provider and backend_provider.lower() not in backend_summary.lower(): + backend_summary = f"{backend_summary} ({backend_provider})" if ai_llm_status.get("ok"): ai_insight = ( "AI trading engine status: OK" f" | mode={ai_llm_status.get('manager_mode') or 'unknown'}" + f" | backend={backend_summary}" + f" | model={model_used or 'unknown'}" f" | target_positions={ai_llm_status.get('target_positions', 0)}" f" | closed={ai_llm_status.get('positions_closed_by_ai', 0)}" f" | opened={ai_llm_status.get('positions_opened', 0)}" f" | topped_up={ai_llm_status.get('positions_topped_up', 0)}" ) + if fallback_from_backend: + ai_insight += f" | fallback_from={fallback_from_backend}" else: - ai_insight = f"AI trading engine status: ERROR - {ai_llm_status.get('error')}" + ai_insight = ( + f"AI trading engine status: ERROR - {ai_llm_status.get('error')}" + f" | backend={backend_summary}" + f" | model={model_used or 'unknown'}" + ) ai_email_positions = list(ai_new) + list(ai_topups) ai_email_sent = notifier.send_daily_report( report_data=ai_report, diff --git a/positions.py b/positions.py index 091a37b..cff9b8b 100644 --- a/positions.py +++ b/positions.py @@ -532,6 +532,18 @@ def get_open_positions(self): conn.close() return open_positions + def get_performance_period_start(self): + """Best-effort period start date since the last reset for this table.""" + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute( + f"SELECT MIN(COALESCE(entry_date, '')) FROM {self.table_name} WHERE COALESCE(entry_date, '') <> ''" + ).fetchone() + value = row[0] if row else None + return str(value).strip() if value else None + finally: + conn.close() + def get_portfolio_summary(self): """Get summary of all positions (open and closed)""" conn = sqlite3.connect(self.db_path) diff --git a/trained_model_client.py b/trained_model_client.py index 7faf836..3f47e83 100644 --- a/trained_model_client.py +++ b/trained_model_client.py @@ -128,6 +128,7 @@ def _predict_batch_http(self, candidates: List[dict], manager_context: Optional[ data = None last_exc = None prediction_url = self._prediction_url() + tried_urls = set() for attempt in range(self.max_retries + 1): attempt_started = time.time() logger.info( @@ -162,6 +163,14 @@ def _predict_batch_http(self, candidates: List[dict], manager_context: Optional[ break except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as exc: last_exc = exc + status_code = getattr(getattr(exc, 'response', None), 'status_code', None) + if status_code == 404: + alt_url = self._alternate_prediction_url(prediction_url) + if alt_url and alt_url not in tried_urls: + tried_urls.add(prediction_url) + prediction_url = alt_url + logger.warning("Trained model 404 on %s; retrying once with alternate url %s", tried_urls, prediction_url) + continue if attempt >= self.max_retries: raise sleep_seconds = self.backoff_seconds * (attempt + 1) @@ -213,6 +222,12 @@ def wait_until_ready(self, timeout_seconds: int = 600, poll_seconds: float = 10. last_error = str(payload.get("error") or payload) except Exception as exc: last_error = str(exc) + if self.provider in {"cerebrium", "cerebrium_full", "cerebrum"} and "404" in last_error: + # Some Cerebrium endpoint shapes do not expose a direct /health route. + # Allow inference run to continue and rely on the actual predict call as truth. + self.last_error = None + self.last_model_used = self.model_identifier + return {"ok": True, "model": self.model_identifier, "health_probe": "skipped_after_404"} self.last_error = last_error remaining = deadline - time.time() if remaining <= 0: @@ -232,6 +247,16 @@ def _prediction_url(self) -> str: return url.rstrip("/") + "/predict_trade_candidates" return url + def _alternate_prediction_url(self, url: str) -> str | None: + raw = str(url or "").strip() + if not raw: + return None + if raw.endswith('/predict_trade_candidates'): + return raw[: -len('/predict_trade_candidates')] + if self.provider in {"cerebrium", "cerebrium_full", "cerebrum"}: + return raw.rstrip('/') + '/predict_trade_candidates' + return None + def _health_url(self) -> str: url = (self.inference_url or "").strip() if not url: From 190fc536a301f5897c6d4456112c8cf986f751b3 Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 13 May 2026 15:00:16 +0000 Subject: [PATCH 2/2] Add Cerebrium predict preflight to prevent costly broken runs --- .github/workflows/daily_trading_bot.yml | 36 +++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/.github/workflows/daily_trading_bot.yml b/.github/workflows/daily_trading_bot.yml index 1a7bbae..94257f1 100644 --- a/.github/workflows/daily_trading_bot.yml +++ b/.github/workflows/daily_trading_bot.yml @@ -228,9 +228,41 @@ jobs: python wait_for_trained_model.py python warm_trained_model.py + - name: Verify Cerebrium predict endpoint + id: verify_cerebrium_predict + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' }} + continue-on-error: true + timeout-minutes: 3 + env: + CEREBRIUM_INFERENCE_URL: ${{ secrets.CEREBRIUM_INFERENCE_URL }} + CEREBRIUM_API_KEY: ${{ secrets.CEREBRIUM_API_KEY || secrets.TRAINED_MODEL_API_KEY }} + run: | + python - <<'PYCODE' + import json, os, requests + url = (os.getenv('CEREBRIUM_INFERENCE_URL') or '').strip() + if not url: + raise SystemExit('CEREBRIUM_INFERENCE_URL is empty') + payload = { + 'task': 'trade_signal_classification', + 'candidates': [{'symbol':'AAPL','as_of_date':'2026-05-13','return_1d':0.01,'return_5d':0.02,'return_10d':0.03,'volatility_20d':0.02,'dist_ma_20':0.01,'dist_ma_50':0.02,'rsi_14':55,'volume_ratio':1.1,'news_count_7d':1,'news_sentiment_7d':0.1}] + } + headers = {'Content-Type':'application/json','Accept':'application/json'} + key = (os.getenv('CEREBRIUM_API_KEY') or '').strip() + if key: + headers['Authorization'] = f'Bearer {key}' + r = requests.post(url, json=payload, headers=headers, timeout=45) + print('status', r.status_code) + print((r.text or '')[:500]) + r.raise_for_status() + data = r.json() + if not isinstance(data, dict) or not isinstance(data.get('signals'), list): + raise SystemExit('Predict response missing signals list') + print('predict preflight ok') + PYCODE + - name: Run AI Trading Bot on Cerebrium id: run_ai_bot_cerebrium - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && steps.verify_cerebrium_predict.outcome == 'success' }} continue-on-error: true timeout-minutes: 90 env: @@ -323,7 +355,7 @@ jobs: - name: Run AI Trading Bot (Distilled Local Fallback) id: run_ai_bot_distilled_local - if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local' || (steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.run_ai_bot_cerebrium.outcome == 'failure')) || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure'))) }} + if: ${{ steps.install_base_dependencies.outcome == 'success' && !(github.event_name == 'workflow_dispatch' && inputs.run_in_lightning_studio) && ((github.event_name != 'workflow_dispatch') || (github.event_name == 'workflow_dispatch' && !inputs.disable_ai_trading)) && (steps.plan_ai_runtime.outputs.runtime_mode == 'distilled_local' || (steps.plan_ai_runtime.outputs.runtime_mode == 'cerebrium_full' && (steps.warm_cerebrium_inference.outcome == 'failure' || steps.verify_cerebrium_predict.outcome == 'failure' || steps.run_ai_bot_cerebrium.outcome == 'failure')) || (steps.plan_ai_runtime.outputs.runtime_mode == 'lightning_full' && (steps.launch_lightning_inference.outcome == 'failure' || steps.run_ai_bot_in_lightning_studio.outcome == 'failure'))) }} continue-on-error: true timeout-minutes: 60 env: