Skip to content

refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446

Open
simonrosenberg wants to merge 9 commits intomainfrom
openhands/asyncio-evaluation-refactor
Open

refactor: Replace ProcessPoolExecutor with asyncio for evaluation#446
simonrosenberg wants to merge 9 commits intomainfrom
openhands/asyncio-evaluation-refactor

Conversation

@simonrosenberg
Copy link
Collaborator

@simonrosenberg simonrosenberg commented Feb 25, 2026

Summary

This refactors the evaluation orchestrator from using ProcessPoolExecutor to asyncio with semaphore-based concurrency. This eliminates the 30× memory multiplication from having 30 worker processes, which was causing OOM failures in long-running SWTBench evaluations (as reported in #441).

Problem

Issue #441 documented that evaluation jobs continued to experience OOM kills on SWTBench despite the memory leak fix in #433. The root cause was that even with the parent process memory fix:

  1. Worker processes never recycle - Each of the 30 worker processes accumulates memory independently
  2. Memory fragmentation - Python heap doesn't release memory back to OS
  3. Resource factor amplification - Failed instances retry with 2x/4x/8x resources

With 30 workers and 8Gi container limit, this meant ~267 MB average per worker, but with retries and fragmentation, memory grew monotonically until OOM.

Solution

Since the workload is I/O-bound (HTTP calls to LLM proxy + runtime API), we can use asyncio for concurrent instance processing. This provides:

  • Single-process concurrency - Eliminates 30× memory multiplication entirely
  • Semaphore-based control - Limits concurrent instances to num_workers
  • Thread executor for sync ops - SDK operations run via asyncio.to_thread()

Key Changes

  1. evaluation.py: Refactored from ProcessPoolExecutor to asyncio

    • New _run_iterative_mode_async() method with asyncio event loop
    • New _run_attempt_async() with semaphore and task management
    • Renamed _process_one_mp_process_one_sync (runs in thread executor)
    • Timeout handling via asyncio.wait() with FIRST_COMPLETED
  2. Tests: Updated for new architecture

    • Added test_async_evaluation.py with 7 tests for asyncio patterns
    • Updated test_workspace_cleanup.py to use _process_one_sync
    • Removed test_keyboard_interrupt.py (was testing ProcessPoolExecutor-specific behavior)
  3. Dependencies: Added pytest-asyncio for async test support

Testing

All 39 tests pass:

tests/test_aggregate_results.py - 14 passed
tests/test_async_evaluation.py - 7 passed
tests/test_commit0_eval_infer.py - 5 passed
tests/test_instance_timeout.py - 3 passed
tests/test_iterative_resume.py - 2 passed
tests/test_metrics.py - 4 passed (benchmark discovery tests)
tests/test_workspace_cleanup.py - 4 passed

Pre-commit checks all pass (pyright, ruff, pycodestyle).

Memory Impact

Architecture Memory Usage (30 workers)
ProcessPoolExecutor 30 × (~100-400 MB) = 3-12 GB
asyncio + threads 1 × (~200-500 MB) = 200-500 MB

The asyncio approach uses a single Python process with thread executors, eliminating the memory multiplication from separate Python interpreters.

Backward Compatibility

  • API unchanged - Evaluation.run() still works the same way
  • Configuration unchanged - num_workers still controls concurrency
  • Behavior unchanged - Same timeout handling, retry logic, etc.

Fixes #441


Tracking issue: OpenHands/evaluation#304
Umbrella issue: OpenHands/evaluation#303

This refactors the evaluation orchestrator from using ProcessPoolExecutor
to asyncio with semaphore-based concurrency. This eliminates the 30×
memory multiplication from having 30 worker processes, which was causing
OOM failures in long-running SWTBench evaluations.

Changes:
- Use asyncio.Semaphore to limit concurrent instances
- Run sync SDK operations (workspace, conversation) via asyncio.to_thread()
- Single-process async concurrency for I/O-bound workload
- Update tests for new architecture
- Add pytest-asyncio for async test support

The workload is I/O-bound (HTTP calls to LLM proxy + runtime API), so
single-process async concurrency provides the same parallelism without
the memory overhead of separate Python processes.

Fixes #441

Co-authored-by: openhands <openhands@all-hands.dev>
simonrosenberg and others added 5 commits February 26, 2026 14:17
The asyncio refactor runs sync SDK operations in thread workers via
asyncio.to_thread(). Two thread-safety bugs caused crashes:

1. redirect_stdout_stderr() replaced the global sys.stdout with a log
   file and restored it in a finally block. When multiple threads ran
   concurrently, one thread closing its log file would crash others
   still writing to it ("I/O operation on closed file").

   Fix: Use threading.local() to store per-thread log files and a
   _ThreadLocalWriter wrapper that delegates writes to the correct
   file for each thread. The wrapper also catches ValueError and
   falls back to the original stream.

2. os.environ["LMNR_SPAN_CONTEXT"] was written without
   synchronisation, allowing one thread to overwrite another's span
   context before it was consumed.

   Fix: Protect the write with a threading.Lock().

Also: remove duplicate @pytest.mark.asyncio decorator, remove unused
sys import, run ruff format.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three bugs caused 275/433 instances to produce no results:

1. Timeout starts at task creation, not semaphore acquisition (204 lost)
   All 433 asyncio tasks are created at once but only 30 run
   concurrently (semaphore). Instances queued behind the semaphore
   burned through their 4h timeout while waiting.

   Fix: Reset start_time to time.monotonic() inside the semaphore
   context, so the timeout counts from when the instance actually
   begins running.

2. lmnr_span.end() raises ValueError in worker threads (37 lost)
   Laminar spans use contextvars which don't work across threads.
   lmnr_span.end() in the finally block raised "Failed to detach
   context" ValueError, crashing the thread and discarding the
   return value (including error outputs from exhausted retries).

   Fix: Wrap lmnr_span.end() in try/except.

3. Generic Exception handler drops instances silently (exacerbates #2)
   When task.result() raised from bug #2, the except handler logged
   the error but did not create an error output or call on_result.

   Fix: Create error_output and call on_result in the generic
   Exception handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
setup_instance_logging() modified the root logger's handlers per-call,
removing all existing handlers and adding new instance-specific ones.
In a multi-threaded environment (asyncio.to_thread workers), thread B
would remove thread A's file handler, causing thread A's logs to go
to thread B's file or be lost entirely.

Fix: Install a single pair of routing handlers on the root logger that
delegate to per-thread state via threading.local():

- _ThreadRoutedFileHandler: routes records to the FileHandler stored
  in the current thread's _logging_local.file_handler
- _ThreadRoutedConsoleHandler: applies the formatter/filter stored in
  the current thread's _logging_local for console output

The routing handlers are installed once (protected by a lock), and
subsequent calls to setup_instance_logging() only update the
thread-local state.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set start_time=inf at task creation so instances waiting in semaphore
  queue can't timeout before they start running
- Add fallback in thread-routed logging handlers so main asyncio thread
  warnings/errors are written to stderr/stdout instead of silently dropped
- Retry instances with no output at all in previous attempt, not just
  critic-flagged failures

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Resolve conflicts: keep asyncio evaluation.py and console_logging.py
from feature branch, take main's versions for all other files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@simonrosenberg simonrosenberg force-pushed the openhands/asyncio-evaluation-refactor branch from 09c9fb1 to 863df2a Compare March 4, 2026 13:21
Resolve conflicts by keeping asyncio architecture from feature branch
while incorporating improvements from main:

- Update Laminar API to two-phase datapoint linking (create datapoint
  immediately for UI, link trace when worker starts)
- Add _cleanup_workspace helper method
- Add _execute_single_attempt method for cleaner retry logic
- Fix _get_instances_for_attempt to retry instances with no output
  in any prior attempt (never_completed set)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@simonrosenberg simonrosenberg self-assigned this Mar 5, 2026
The _ThreadRoutedConsoleHandler was writing WARNING+ level messages
to sys.__stdout__ when no thread-local formatter was set (for the
main asyncio event loop thread). This caused OpenTelemetry context
detachment errors to be written to stdout, corrupting the JSON
output that shell scripts parse with jq.

Changed the fallback to write to sys.__stderr__ instead, matching
the behavior of _ThreadRoutedFileHandler.

This fixes the "jq: parse error: Invalid numeric literal" failures
in gaia, swebench, and swtbench benchmarks.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@juanmichelini juanmichelini self-requested a review March 6, 2026 16:32
@juanmichelini
Copy link
Collaborator

juanmichelini commented Mar 6, 2026

This looks interesting!

testing it with GLM-5 and swebenchmultimodal that has has some deadlocks.https://github.com/OpenHands/software-agent-sdk/actions/runs/22772705315
also testing it against the swtbench images build https://github.com/OpenHands/software-agent-sdk/actions/runs/22772664611

@simonrosenberg simonrosenberg marked this pull request as ready for review March 9, 2026 11:50
Copy link
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taste Rating: 🟡 Acceptable

The core refactor from ProcessPoolExecutor to asyncio is solid engineering that solves a real problem (30× memory multiplication). However, the implementation has unnecessary special-case complexity that violates the "good taste" principle of eliminating edge cases rather than handling them.


Critical Issues

🔴 Missing Integration Test: New tests only verify asyncio primitives work, not the actual evaluation flow. Need an end-to-end test with real Evaluation instance.

🔴 Missing Evidence: PR description shows pytest output but no actual benchmark run. Per project principles, need to prove this works in production:

  • Run real SWE-Bench/GAIA evaluation with a few instances
  • Show command + full output/logs
  • Demonstrate memory stays bounded (the whole point!)
  • Include agent conversation URL if applicable

Design Issues

🟠 Thread-Local Logging Complexity: Fallback logic for "threads without setup" creates multiple special-case paths. Should ensure ALL threads have proper setup instead of conditional fallbacks.

🟡 Timeout Sentinel Value: Using float("inf") while queued is clever but still a special case. Better to make start_time Optional and set when work begins.

🟡 Magic Attribute Forwarding: __getattr__ in _ThreadLocalWriter could hide bugs. Consider explicit property forwarding for needed attributes.


See inline comments for specific locations.

Comment on lines +380 to +391
fh: logging.FileHandler | None = getattr(_logging_local, "file_handler", None)
if fh is not None:
# Apply our formatter and delegate to per-thread file handler
record_msg = self.format(record)
try:
fh.stream.write(record_msg + "\n")
fh.stream.flush()
except Exception:
pass
elif record.levelno >= logging.WARNING:
# Fallback for threads without per-thread setup (e.g. main
# asyncio event loop thread): write warnings+ to stderr so
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important - Special Case Complexity: This fallback logic for "threads without per-thread setup" is a design smell. Why do some threads not have setup? The comment mentions "main asyncio event loop thread" but that is a special case you are handling rather than eliminating.

Better design: Ensure ALL threads that emit logs have proper setup, eliminating this fallback path entirely. If the main thread needs different behavior, set up its handlers explicitly rather than using conditional fallback logic.

Comment on lines +407 to +423

def __init__(self) -> None:
super().__init__(level=logging.INFO)

def emit(self, record: logging.LogRecord) -> None:
fmt = getattr(_logging_local, "console_formatter", None)
filt = getattr(_logging_local, "console_filter", None)
level = getattr(_logging_local, "console_level", logging.WARNING)
if record.levelno < level:
if filt and not filt.filter(record):
return
elif not filt:
return
stream = sys.__stdout__
if fmt and stream:
try:
msg = fmt.format(record)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important - More Special Case Handling: Same issue as above - complex conditional logic for threads without setup. This creates three code paths: (1) Thread has formatter → write to stdout, (2) Thread lacks formatter but WARNING+ → write to stderr, (3) Everything else → drop silently.

This is exactly the kind of special-case proliferation that makes code unmaintainable. Simplify: Set up logging for all threads, eliminate the fallback paths.

Comment on lines +483 to +485
task = asyncio.current_task()
if task and task in pending_instances:
pending_instances[task].start_time = time.monotonic()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion - Timeout Sentinel Value: Using float("inf") as a sentinel to prevent timeout while queued is clever, but it is still a special case. The comment acknowledges this: "Don't timeout while queued; reset when semaphore acquired."

Cleaner approach: Don't track start_time at all until the semaphore is acquired. Change PendingInstance to make start_time Optional, set it to None initially, then set it when work begins. This eliminates the sentinel value and makes the state machine explicit.

Comment on lines 973 to +1006
if log_file is not None and not log_file.closed:
log_file.close()


class _ThreadLocalWriter:
"""A sys.stdout / sys.stderr replacement that writes to a per-thread file.

If the current thread has set ``_thread_local.log_file``, writes go there.
Otherwise writes fall through to the original stream (usually the real
terminal stdout / stderr).
"""

def __init__(self, original):
self._original = original

# --- file-like API used by print() and the logging module -----------------

def write(self, s: str) -> int:
target = getattr(_thread_local, "log_file", None) or self._original
try:
return target.write(s)
except ValueError:
# Handle "I/O operation on closed file" gracefully –
# fall back to original stream instead of crashing.
return self._original.write(s)

def flush(self) -> None:
target = getattr(_thread_local, "log_file", None) or self._original
try:
target.flush()
except ValueError:
self._original.flush()

# --- forward attribute access for anything else (encoding, fileno, etc.) --
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion - Magic Attribute Forwarding: The __getattr__ fallback is convenient but could hide bugs if the original stream and log file have different attributes or behavior.

More explicit approach: Define properties for the specific attributes you need (encoding, closed, etc.) rather than using magic __getattr__ forwarding.

if runtime_failure_count > 0:
logger.warning(
f"[child] Instance {instance.id}: "
f"[worker] Instance {instance.id}: "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Good: Thread-safe environment variable access. This lock prevents race conditions where one thread overwrites another's span context.

Comment on lines +904 to 908
pass


# ---------- Multiprocessing logging helper ---------------------------------------
# ---------- Thread-safety helpers ------------------------------------------------

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Acceptable - Pragmatic Workaround: Catching and ignoring span.end() exceptions is a workaround for Python's contextvars limitation (tokens created in main thread cannot be detached from worker threads). This is a real constraint, not over-engineering.

Comment on lines +1 to +10
"""Tests for asyncio-based evaluation orchestration.

These tests verify the asyncio refactor from ProcessPoolExecutor to asyncio
with semaphore-based concurrency and asyncio.to_thread() for sync operations.
"""

import asyncio
import time

import pytest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Critical - Testing Gap: These tests only verify asyncio primitives (Semaphore, wait, to_thread, etc.) work correctly. They do not test the actual evaluation logic.

Missing: An integration test that: (1) Creates a real Evaluation instance with test config, (2) Runs actual instances through _run_iterative_mode_async, (3) Verifies outputs are correct and thread-safe, (4) Tests error handling and timeout behavior in the real flow.

The removed test_keyboard_interrupt.py also needs replacement - how does SIGINT work with asyncio now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

OOM failures persist in SWTBench evaluations despite fix in #433

4 participants