diff --git a/pathview/app.py b/pathview/app.py index 9a757f4..13615fc 100644 --- a/pathview/app.py +++ b/pathview/app.py @@ -21,14 +21,13 @@ from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS -# --------------------------------------------------------------------------- -# Configuration -# --------------------------------------------------------------------------- - -SESSION_TTL = 3600 # 1 hour of inactivity before cleanup -CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds -EXEC_TIMEOUT = 35 # Server-side timeout for exec/eval (slightly > worker's 30s) -WORKER_SCRIPT = str(Path(__file__).parent / "worker.py") +from pathview.config import ( + WORKER_SCRIPT, + SERVER_TIMEOUT, + INIT_TIMEOUT, + SESSION_TTL, + CLEANUP_INTERVAL, +) # --------------------------------------------------------------------------- # Session management @@ -80,7 +79,7 @@ def read_line(self) -> dict | None: except json.JSONDecodeError: continue - def read_line_timeout(self, timeout: float = EXEC_TIMEOUT) -> dict | None: + def read_line_timeout(self, timeout: float = SERVER_TIMEOUT) -> dict | None: """Read one JSON line with a timeout. Returns None on EOF or timeout. Raises TimeoutError if no response within the timeout period. @@ -116,7 +115,7 @@ def ensure_initialized(self, packages: list[dict] | None = None) -> list[dict]: init_msg["packages"] = packages self.send_message(init_msg) while True: - resp = self.read_line() + resp = self.read_line_timeout(timeout=INIT_TIMEOUT) if resp is None: raise RuntimeError("Worker process died during initialization") messages.append(resp) @@ -480,7 +479,8 @@ def api_stream_exec(): if not session: return jsonify({"error": "No active session"}), 404 try: - session.send_message({"type": "stream-exec", "code": code}) + with session.lock: + session.send_message({"type": "stream-exec", "code": code}) return jsonify({"status": "queued"}) except Exception as e: return jsonify({"error": str(e)}), 500 @@ -497,7 +497,8 @@ def api_stream_stop(): if not session: return jsonify({"status": "stopped"}) try: - session.send_message({"type": "stream-stop"}) + with session.lock: + session.send_message({"type": "stream-stop"}) return jsonify({"status": "stopped"}) except Exception as e: return jsonify({"error": str(e)}), 500 diff --git a/pathview/config.py b/pathview/config.py new file mode 100644 index 0000000..623a0c4 --- /dev/null +++ b/pathview/config.py @@ -0,0 +1,24 @@ +"""Shared configuration constants for the PathView backend.""" + +from pathlib import Path + +# --------------------------------------------------------------------------- +# Paths +# --------------------------------------------------------------------------- + +WORKER_SCRIPT = str(Path(__file__).parent / "worker.py") + +# --------------------------------------------------------------------------- +# Timeouts (seconds) +# --------------------------------------------------------------------------- + +EXEC_TIMEOUT = 30 # Per exec/eval call in the worker +SERVER_TIMEOUT = 35 # Server-side read timeout (slightly > worker's EXEC_TIMEOUT) +INIT_TIMEOUT = 120 # Initialization / pip install (matches frontend TIMEOUTS.INIT) + +# --------------------------------------------------------------------------- +# Session management +# --------------------------------------------------------------------------- + +SESSION_TTL = 3600 # Inactive session cleanup after 1 hour +CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds diff --git a/pathview/worker.py b/pathview/worker.py index 9b8e71d..05b762b 100644 --- a/pathview/worker.py +++ b/pathview/worker.py @@ -23,6 +23,8 @@ import queue import ctypes +from pathview.config import EXEC_TIMEOUT + # Lock for thread-safe stdout writing (protocol messages only) _stdout_lock = threading.Lock() @@ -41,9 +43,6 @@ _namespace = {} _initialized = False -# Default timeout for exec/eval (seconds) -EXEC_TIMEOUT = 30 - # Streaming state _streaming_active = False _streaming_code_queue = queue.Queue() @@ -335,9 +334,17 @@ def run_streaming_loop(msg_id: str, expr: str) -> None: except Exception as e: send({"type": "stderr", "value": f"Stream exec error: {e}"}) - # Step the generator - exec_code_str = f"_eval_result = {expr}" - exec(exec_code_str, _namespace) + # Step the generator (with timeout so a stuck C extension + # doesn't hang the worker forever) + try: + _run_with_timeout( + lambda: exec(f"_eval_result = {expr}", _namespace), + timeout=EXEC_TIMEOUT, + ) + except TimeoutError: + send({"type": "error", "id": msg_id, + "error": f"Simulation step timed out after {EXEC_TIMEOUT}s"}) + break raw_result = _namespace["_eval_result"] done = raw_result.get("done", False) if isinstance(raw_result, dict) else False