Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions pathview/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
from flask import Flask, request, jsonify, send_from_directory
from flask_cors import CORS

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

SESSION_TTL = 3600 # 1 hour of inactivity before cleanup
CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds
EXEC_TIMEOUT = 35 # Server-side timeout for exec/eval (slightly > worker's 30s)
WORKER_SCRIPT = str(Path(__file__).parent / "worker.py")
from pathview.config import (
WORKER_SCRIPT,
SERVER_TIMEOUT,
INIT_TIMEOUT,
SESSION_TTL,
CLEANUP_INTERVAL,
)

# ---------------------------------------------------------------------------
# Session management
Expand Down Expand Up @@ -80,7 +79,7 @@ def read_line(self) -> dict | None:
except json.JSONDecodeError:
continue

def read_line_timeout(self, timeout: float = EXEC_TIMEOUT) -> dict | None:
def read_line_timeout(self, timeout: float = SERVER_TIMEOUT) -> dict | None:
"""Read one JSON line with a timeout. Returns None on EOF or timeout.

Raises TimeoutError if no response within the timeout period.
Expand Down Expand Up @@ -116,7 +115,7 @@ def ensure_initialized(self, packages: list[dict] | None = None) -> list[dict]:
init_msg["packages"] = packages
self.send_message(init_msg)
while True:
resp = self.read_line()
resp = self.read_line_timeout(timeout=INIT_TIMEOUT)
if resp is None:
raise RuntimeError("Worker process died during initialization")
messages.append(resp)
Expand Down Expand Up @@ -480,7 +479,8 @@ def api_stream_exec():
if not session:
return jsonify({"error": "No active session"}), 404
try:
session.send_message({"type": "stream-exec", "code": code})
with session.lock:
session.send_message({"type": "stream-exec", "code": code})
return jsonify({"status": "queued"})
except Exception as e:
return jsonify({"error": str(e)}), 500
Expand All @@ -497,7 +497,8 @@ def api_stream_stop():
if not session:
return jsonify({"status": "stopped"})
try:
session.send_message({"type": "stream-stop"})
with session.lock:
session.send_message({"type": "stream-stop"})
return jsonify({"status": "stopped"})
except Exception as e:
return jsonify({"error": str(e)}), 500
Expand Down
24 changes: 24 additions & 0 deletions pathview/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Shared configuration constants for the PathView backend."""

from pathlib import Path

# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------

WORKER_SCRIPT = str(Path(__file__).parent / "worker.py")

# ---------------------------------------------------------------------------
# Timeouts (seconds)
# ---------------------------------------------------------------------------

EXEC_TIMEOUT = 30 # Per exec/eval call in the worker
SERVER_TIMEOUT = 35 # Server-side read timeout (slightly > worker's EXEC_TIMEOUT)
INIT_TIMEOUT = 120 # Initialization / pip install (matches frontend TIMEOUTS.INIT)

# ---------------------------------------------------------------------------
# Session management
# ---------------------------------------------------------------------------

SESSION_TTL = 3600 # Inactive session cleanup after 1 hour
CLEANUP_INTERVAL = 60 # Check for stale sessions every 60 seconds
19 changes: 13 additions & 6 deletions pathview/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import queue
import ctypes

from pathview.config import EXEC_TIMEOUT

# Lock for thread-safe stdout writing (protocol messages only)
_stdout_lock = threading.Lock()

Expand All @@ -41,9 +43,6 @@
_namespace = {}
_initialized = False

# Default timeout for exec/eval (seconds)
EXEC_TIMEOUT = 30

# Streaming state
_streaming_active = False
_streaming_code_queue = queue.Queue()
Expand Down Expand Up @@ -335,9 +334,17 @@ def run_streaming_loop(msg_id: str, expr: str) -> None:
except Exception as e:
send({"type": "stderr", "value": f"Stream exec error: {e}"})

# Step the generator
exec_code_str = f"_eval_result = {expr}"
exec(exec_code_str, _namespace)
# Step the generator (with timeout so a stuck C extension
# doesn't hang the worker forever)
try:
_run_with_timeout(
lambda: exec(f"_eval_result = {expr}", _namespace),
timeout=EXEC_TIMEOUT,
)
except TimeoutError:
send({"type": "error", "id": msg_id,
"error": f"Simulation step timed out after {EXEC_TIMEOUT}s"})
break
raw_result = _namespace["_eval_result"]
done = raw_result.get("done", False) if isinstance(raw_result, dict) else False

Expand Down