refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446
refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446simonrosenberg wants to merge 9 commits intomainfrom
Conversation
This refactors the evaluation orchestrator from using ProcessPoolExecutor to asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations. Changes: - Use asyncio.Semaphore to limit concurrent instances - Run sync SDK operations (workspace, conversation) via asyncio.to_thread() - Single-process async concurrency for I/O-bound workload - Update tests for new architecture - Add pytest-asyncio for async test support The workload is I/O-bound (HTTP calls to LLM proxy + runtime API), so single-process async concurrency provides the same parallelism without the memory overhead of separate Python processes. Fixes #441 Co-authored-by: openhands <openhands@all-hands.dev>
The asyncio refactor runs sync SDK operations in thread workers via
asyncio.to_thread(). Two thread-safety bugs caused crashes:
1. redirect_stdout_stderr() replaced the global sys.stdout with a log
file and restored it in a finally block. When multiple threads ran
concurrently, one thread closing its log file would crash others
still writing to it ("I/O operation on closed file").
Fix: Use threading.local() to store per-thread log files and a
_ThreadLocalWriter wrapper that delegates writes to the correct
file for each thread. The wrapper also catches ValueError and
falls back to the original stream.
2. os.environ["LMNR_SPAN_CONTEXT"] was written without
synchronisation, allowing one thread to overwrite another's span
context before it was consumed.
Fix: Protect the write with a threading.Lock().
Also: remove duplicate @pytest.mark.asyncio decorator, remove unused
sys import, run ruff format.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three bugs caused 275/433 instances to produce no results: 1. Timeout starts at task creation, not semaphore acquisition (204 lost) All 433 asyncio tasks are created at once but only 30 run concurrently (semaphore). Instances queued behind the semaphore burned through their 4h timeout while waiting. Fix: Reset start_time to time.monotonic() inside the semaphore context, so the timeout counts from when the instance actually begins running. 2. lmnr_span.end() raises ValueError in worker threads (37 lost) Laminar spans use contextvars which don't work across threads. lmnr_span.end() in the finally block raised "Failed to detach context" ValueError, crashing the thread and discarding the return value (including error outputs from exhausted retries). Fix: Wrap lmnr_span.end() in try/except. 3. Generic Exception handler drops instances silently (exacerbates #2) When task.result() raised from bug #2, the except handler logged the error but did not create an error output or call on_result. Fix: Create error_output and call on_result in the generic Exception handler. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
setup_instance_logging() modified the root logger's handlers per-call, removing all existing handlers and adding new instance-specific ones. In a multi-threaded environment (asyncio.to_thread workers), thread B would remove thread A's file handler, causing thread A's logs to go to thread B's file or be lost entirely. Fix: Install a single pair of routing handlers on the root logger that delegate to per-thread state via threading.local(): - _ThreadRoutedFileHandler: routes records to the FileHandler stored in the current thread's _logging_local.file_handler - _ThreadRoutedConsoleHandler: applies the formatter/filter stored in the current thread's _logging_local for console output The routing handlers are installed once (protected by a lock), and subsequent calls to setup_instance_logging() only update the thread-local state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set start_time=inf at task creation so instances waiting in semaphore queue can't timeout before they start running - Add fallback in thread-routed logging handlers so main asyncio thread warnings/errors are written to stderr/stdout instead of silently dropped - Retry instances with no output at all in previous attempt, not just critic-flagged failures Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts: keep asyncio evaluation.py and console_logging.py from feature branch, take main's versions for all other files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
09c9fb1 to
863df2a
Compare
Resolve conflicts by keeping asyncio architecture from feature branch while incorporating improvements from main: - Update Laminar API to two-phase datapoint linking (create datapoint immediately for UI, link trace when worker starts) - Add _cleanup_workspace helper method - Add _execute_single_attempt method for cleaner retry logic - Fix _get_instances_for_attempt to retry instances with no output in any prior attempt (never_completed set) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The _ThreadRoutedConsoleHandler was writing WARNING+ level messages to sys.__stdout__ when no thread-local formatter was set (for the main asyncio event loop thread). This caused OpenTelemetry context detachment errors to be written to stdout, corrupting the JSON output that shell scripts parse with jq. Changed the fallback to write to sys.__stderr__ instead, matching the behavior of _ThreadRoutedFileHandler. This fixes the "jq: parse error: Invalid numeric literal" failures in gaia, swebench, and swtbench benchmarks. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
This looks interesting! testing it with GLM-5 and swebenchmultimodal that has has some deadlocks.https://github.com/OpenHands/software-agent-sdk/actions/runs/22772705315 |
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟡 Acceptable
The core refactor from ProcessPoolExecutor to asyncio is solid engineering that solves a real problem (30× memory multiplication). However, the implementation has unnecessary special-case complexity that violates the "good taste" principle of eliminating edge cases rather than handling them.
Critical Issues
🔴 Missing Integration Test: New tests only verify asyncio primitives work, not the actual evaluation flow. Need an end-to-end test with real Evaluation instance.
🔴 Missing Evidence: PR description shows pytest output but no actual benchmark run. Per project principles, need to prove this works in production:
- Run real SWE-Bench/GAIA evaluation with a few instances
- Show command + full output/logs
- Demonstrate memory stays bounded (the whole point!)
- Include agent conversation URL if applicable
Design Issues
🟠 Thread-Local Logging Complexity: Fallback logic for "threads without setup" creates multiple special-case paths. Should ensure ALL threads have proper setup instead of conditional fallbacks.
🟡 Timeout Sentinel Value: Using float("inf") while queued is clever but still a special case. Better to make start_time Optional and set when work begins.
🟡 Magic Attribute Forwarding: __getattr__ in _ThreadLocalWriter could hide bugs. Consider explicit property forwarding for needed attributes.
See inline comments for specific locations.
| fh: logging.FileHandler | None = getattr(_logging_local, "file_handler", None) | ||
| if fh is not None: | ||
| # Apply our formatter and delegate to per-thread file handler | ||
| record_msg = self.format(record) | ||
| try: | ||
| fh.stream.write(record_msg + "\n") | ||
| fh.stream.flush() | ||
| except Exception: | ||
| pass | ||
| elif record.levelno >= logging.WARNING: | ||
| # Fallback for threads without per-thread setup (e.g. main | ||
| # asyncio event loop thread): write warnings+ to stderr so |
There was a problem hiding this comment.
🟠 Important - Special Case Complexity: This fallback logic for "threads without per-thread setup" is a design smell. Why do some threads not have setup? The comment mentions "main asyncio event loop thread" but that is a special case you are handling rather than eliminating.
Better design: Ensure ALL threads that emit logs have proper setup, eliminating this fallback path entirely. If the main thread needs different behavior, set up its handlers explicitly rather than using conditional fallback logic.
|
|
||
| def __init__(self) -> None: | ||
| super().__init__(level=logging.INFO) | ||
|
|
||
| def emit(self, record: logging.LogRecord) -> None: | ||
| fmt = getattr(_logging_local, "console_formatter", None) | ||
| filt = getattr(_logging_local, "console_filter", None) | ||
| level = getattr(_logging_local, "console_level", logging.WARNING) | ||
| if record.levelno < level: | ||
| if filt and not filt.filter(record): | ||
| return | ||
| elif not filt: | ||
| return | ||
| stream = sys.__stdout__ | ||
| if fmt and stream: | ||
| try: | ||
| msg = fmt.format(record) |
There was a problem hiding this comment.
🟠 Important - More Special Case Handling: Same issue as above - complex conditional logic for threads without setup. This creates three code paths: (1) Thread has formatter → write to stdout, (2) Thread lacks formatter but WARNING+ → write to stderr, (3) Everything else → drop silently.
This is exactly the kind of special-case proliferation that makes code unmaintainable. Simplify: Set up logging for all threads, eliminate the fallback paths.
| task = asyncio.current_task() | ||
| if task and task in pending_instances: | ||
| pending_instances[task].start_time = time.monotonic() |
There was a problem hiding this comment.
🟡 Suggestion - Timeout Sentinel Value: Using float("inf") as a sentinel to prevent timeout while queued is clever, but it is still a special case. The comment acknowledges this: "Don't timeout while queued; reset when semaphore acquired."
Cleaner approach: Don't track start_time at all until the semaphore is acquired. Change PendingInstance to make start_time Optional, set it to None initially, then set it when work begins. This eliminates the sentinel value and makes the state machine explicit.
| if log_file is not None and not log_file.closed: | ||
| log_file.close() | ||
|
|
||
|
|
||
| class _ThreadLocalWriter: | ||
| """A sys.stdout / sys.stderr replacement that writes to a per-thread file. | ||
|
|
||
| If the current thread has set ``_thread_local.log_file``, writes go there. | ||
| Otherwise writes fall through to the original stream (usually the real | ||
| terminal stdout / stderr). | ||
| """ | ||
|
|
||
| def __init__(self, original): | ||
| self._original = original | ||
|
|
||
| # --- file-like API used by print() and the logging module ----------------- | ||
|
|
||
| def write(self, s: str) -> int: | ||
| target = getattr(_thread_local, "log_file", None) or self._original | ||
| try: | ||
| return target.write(s) | ||
| except ValueError: | ||
| # Handle "I/O operation on closed file" gracefully – | ||
| # fall back to original stream instead of crashing. | ||
| return self._original.write(s) | ||
|
|
||
| def flush(self) -> None: | ||
| target = getattr(_thread_local, "log_file", None) or self._original | ||
| try: | ||
| target.flush() | ||
| except ValueError: | ||
| self._original.flush() | ||
|
|
||
| # --- forward attribute access for anything else (encoding, fileno, etc.) -- |
There was a problem hiding this comment.
🟡 Suggestion - Magic Attribute Forwarding: The __getattr__ fallback is convenient but could hide bugs if the original stream and log file have different attributes or behavior.
More explicit approach: Define properties for the specific attributes you need (encoding, closed, etc.) rather than using magic __getattr__ forwarding.
| if runtime_failure_count > 0: | ||
| logger.warning( | ||
| f"[child] Instance {instance.id}: " | ||
| f"[worker] Instance {instance.id}: " |
There was a problem hiding this comment.
🟢 Good: Thread-safe environment variable access. This lock prevents race conditions where one thread overwrites another's span context.
| pass | ||
|
|
||
|
|
||
| # ---------- Multiprocessing logging helper --------------------------------------- | ||
| # ---------- Thread-safety helpers ------------------------------------------------ | ||
|
|
There was a problem hiding this comment.
🟢 Acceptable - Pragmatic Workaround: Catching and ignoring span.end() exceptions is a workaround for Python's contextvars limitation (tokens created in main thread cannot be detached from worker threads). This is a real constraint, not over-engineering.
| """Tests for asyncio-based evaluation orchestration. | ||
|
|
||
| These tests verify the asyncio refactor from ProcessPoolExecutor to asyncio | ||
| with semaphore-based concurrency and asyncio.to_thread() for sync operations. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import time | ||
|
|
||
| import pytest |
There was a problem hiding this comment.
🔴 Critical - Testing Gap: These tests only verify asyncio primitives (Semaphore, wait, to_thread, etc.) work correctly. They do not test the actual evaluation logic.
Missing: An integration test that: (1) Creates a real Evaluation instance with test config, (2) Runs actual instances through _run_iterative_mode_async, (3) Verifies outputs are correct and thread-safe, (4) Tests error handling and timeout behavior in the real flow.
The removed test_keyboard_interrupt.py also needs replacement - how does SIGINT work with asyncio now?
Summary
This refactors the evaluation orchestrator from using
ProcessPoolExecutorto asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations (as reported in #441).Problem
Issue #441 documented that evaluation jobs continued to experience OOM kills on SWTBench despite the memory leak fix in #433. The root cause was that even with the parent process memory fix:
With 30 workers and 8Gi container limit, this meant ~267 MB average per worker, but with retries and fragmentation, memory grew monotonically until OOM.
Solution
Since the workload is I/O-bound (HTTP calls to LLM proxy + runtime API), we can use asyncio for concurrent instance processing. This provides:
num_workersasyncio.to_thread()Key Changes
evaluation.py: Refactored from ProcessPoolExecutor to asyncio_run_iterative_mode_async()method with asyncio event loop_run_attempt_async()with semaphore and task management_process_one_mp→_process_one_sync(runs in thread executor)asyncio.wait()withFIRST_COMPLETEDTests: Updated for new architecture
test_async_evaluation.pywith 7 tests for asyncio patternstest_workspace_cleanup.pyto use_process_one_synctest_keyboard_interrupt.py(was testing ProcessPoolExecutor-specific behavior)Dependencies: Added
pytest-asynciofor async test supportTesting
All 39 tests pass:
Pre-commit checks all pass (pyright, ruff, pycodestyle).
Memory Impact
The asyncio approach uses a single Python process with thread executors, eliminating the memory multiplication from separate Python interpreters.
Backward Compatibility
Evaluation.run()still works the same waynum_workersstill controls concurrencyFixes #441
Tracking issue: OpenHands/evaluation#304
Umbrella issue: OpenHands/evaluation#303