From 706bc1928cf3c22e14fe022f605abb1cd5b1a5ba Mon Sep 17 00:00:00 2001 From: cemde <42615086+cemde@users.noreply.github.com> Date: Tue, 12 May 2026 12:04:54 +0200 Subject: [PATCH 1/2] fix implemented --- maseval/core/benchmark.py | 155 ++++++---- .../test_benchmark/test_report_schema.py | 281 ++++++++++++++++++ 2 files changed, 386 insertions(+), 50 deletions(-) create mode 100644 tests/test_core/test_benchmark/test_report_schema.py diff --git a/maseval/core/benchmark.py b/maseval/core/benchmark.py index d59ad233..68ce11fd 100644 --- a/maseval/core/benchmark.py +++ b/maseval/core/benchmark.py @@ -519,6 +519,64 @@ def _invoke_callbacks(self, method_name: str, *args, suppress_errors: bool = Tru return errors + def _build_report( + self, + task: Task, + repeat_idx: int, + status: TaskExecutionStatus, + *, + error: Optional[Dict[str, Any]] = None, + traces: Optional[Dict[str, Any]] = None, + config: Optional[Dict[str, Any]] = None, + usage: Optional[Dict[str, Any]] = None, + eval_results: Any = None, + ) -> Dict[str, Any]: + """Build a task-repetition report with the canonical schema. + + Every report — success or failure, from setup, execution or evaluation, + and from sequential or parallel runs — carries the same top-level keys so + downstream consumers can rely on a stable schema. ``error`` is ``None`` + only for ``SUCCESS``; for any other status it is always populated (a + generic placeholder is synthesised if a caller forgot to supply one). + + Args: + task: The task this report describes. + repeat_idx: Repetition index (0 to n_task_repeats-1). + status: Final execution status for this repetition. + error: Error details dict (``error_type``/``error_message``/``traceback``, + plus any extra fields), or ``None`` if the repetition succeeded. + traces: Collected execution traces (defaults to ``{}`` when not available). + config: Collected component/benchmark configuration (defaults to ``{}``). + usage: Collected usage totals, or ``None`` if not collected. + eval_results: Evaluation results, or ``None`` if evaluation was skipped or failed. + + Returns: + Report dictionary with keys: ``task_id``, ``repeat_idx``, ``status``, + ``error``, ``traces``, ``config``, ``usage``, ``eval``, ``task``. + """ + if status is not TaskExecutionStatus.SUCCESS and error is None: + # Defensive: a non-success report must always carry error details. + error = { + "error_type": "UnknownError", + "error_message": f"Task ended with status '{status.value}' but no error details were recorded.", + "traceback": "", + } + return { + "task_id": str(task.id), + "repeat_idx": repeat_idx, + "status": status.value, + "error": error, + "traces": traces if traces is not None else {}, + "config": config if config is not None else {}, + "usage": usage, + "eval": eval_results, + "task": { + "query": task.query, + "metadata": dict(task.metadata), + "protocol": task.protocol.to_dict(), + }, + } + def _append_report_safe(self, report: Dict[str, Any]) -> None: """Append a report to the reports list (thread-safe). @@ -1089,16 +1147,14 @@ def _execute_task_repetition( "traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)), } - # Create a minimal report for this timeout - report = { - "task_id": str(task.id), - "repeat_idx": repeat_idx, - "status": execution_status.value, - "error": error_info, - "traces": e.partial_traces, - "config": {}, - "eval": None, - } + # Create a minimal report for this timeout (canonical schema) + report = self._build_report( + task, + repeat_idx, + execution_status, + error=error_info, + traces=e.partial_traces, + ) self.clear_registry() return report @@ -1111,22 +1167,13 @@ def _execute_task_repetition( "traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)), } - # Create a minimal report for this failed setup - report = { - "task_id": str(task.id), - "repeat_idx": repeat_idx, - "status": execution_status.value, - "error": error_info, - "traces": {}, - "config": {}, - "eval": None, - } self.clear_registry() if self.fail_on_setup_error: raise - return report + # Create a minimal report for this failed setup (canonical schema) + return self._build_report(task, repeat_idx, execution_status, error=error_info) # 2. Execute agent system with optional user interaction loop try: @@ -1265,21 +1312,16 @@ def _execute_task_repetition( eval_results = None # 5. Build report — all keys always present for consistent schema - report: Dict[str, Any] = { - "task_id": str(task.id), - "repeat_idx": repeat_idx, - "status": execution_status.value, - "error": error_info, - "traces": execution_traces, - "config": execution_configs, - "usage": execution_usage, - "eval": eval_results, - "task": { - "query": task.query, - "metadata": dict(task.metadata), - "protocol": task.protocol.to_dict(), - }, - } + report = self._build_report( + task, + repeat_idx, + execution_status, + error=error_info, + traces=execution_traces, + config=execution_configs, + usage=execution_usage, + eval_results=eval_results, + ) # Clear registry after task repetition completes self.clear_registry() @@ -1391,20 +1433,26 @@ def submit_task_repeats(task: Task) -> None: try: report = future.result() except Exception as e: - # Create error report for unexpected failures - report = { - "task_id": task_id, - "repeat_idx": repeat_idx, - "status": TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR.value, - "error": { + # A deliberate fail-fast re-raise from _execute_task_repetition + # (fail_on_setup_error / fail_on_task_error / fail_on_evaluation_error) + # must abort the parallel run, matching sequential semantics — rather + # than being swallowed into a degraded report and letting the run continue. + if self.fail_on_setup_error or self.fail_on_task_error or self.fail_on_evaluation_error: + executor.shutdown(wait=False, cancel_futures=True) + raise + # Otherwise this is an unexpected failure inside the worker itself + # (not handled by _execute_task_repetition) — record it with the + # canonical report schema and carry on with the remaining tasks. + report = self._build_report( + task, + repeat_idx, + TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR, + error={ "error_type": type(e).__name__, "error_message": str(e), "traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)), }, - "traces": {}, - "config": {}, - "eval": None, - } + ) self._append_report_safe(report) @@ -1447,20 +1495,27 @@ def run( model parameters, agent architecture details, and tool specifications. Returns: - List of report dictionaries, one per task repetition. Each report contains: + List of report dictionaries, one per task repetition. Every report carries the + same keys (consistent schema) regardless of success or failure: - task_id: Task identifier (UUID) - repeat_idx: Repetition index (0 to n_task_repeats-1) - status: Execution status (one of TaskExecutionStatus enum values) - - traces: Execution traces from all registered components - - config: Configuration from all registered components and benchmark level + - traces: Execution traces from all registered components (``{}`` if unavailable, e.g. setup failure) + - config: Configuration from all registered components and benchmark level (``{}`` if unavailable) + - usage: Aggregated usage from all registered components (``None`` if not collected) - eval: Evaluation results (None if task or evaluation failed) - - error: Error details dict (only present if status is not SUCCESS), containing: + - task: Task summary dict with ``query``, ``metadata``, and ``protocol`` + - error: Error details dict — ``None`` only when status is SUCCESS; otherwise always populated, containing: - error_type: Exception class name - error_message: Exception message - traceback: Full traceback string + - (plus any error-specific extras, e.g. ``component``, ``elapsed``, ``timeout``) Raises: ValueError: If agent_data length doesn't match number of tasks (when agent_data is an iterable). + Exception: If a ``fail_on_setup_error`` / ``fail_on_task_error`` / ``fail_on_evaluation_error`` + flag is set and the corresponding failure occurs, the original exception is re-raised + and the run is aborted (this applies to both sequential and parallel execution). How to use: This is the framework's main orchestration method that runs your entire benchmark. It diff --git a/tests/test_core/test_benchmark/test_report_schema.py b/tests/test_core/test_benchmark/test_report_schema.py new file mode 100644 index 00000000..6091013b --- /dev/null +++ b/tests/test_core/test_benchmark/test_report_schema.py @@ -0,0 +1,281 @@ +"""Tests for report-schema consistency across the whole task lifecycle. + +Every report a benchmark produces — whether the task succeeded or failed in +setup, execution, or evaluation, and whether it ran sequentially or in +parallel — must carry the same set of top-level keys so downstream consumers +can rely on a stable schema. When a task did not succeed, ``report["error"]`` +must always be populated. In addition, when a ``fail_on_*`` flag is set a +parallel run must abort just like a sequential one (it must not silently +swallow the failure and keep going). +""" + +import time + +import pytest + +from maseval import ( + AgentAdapter, + Evaluator, + Task, + TaskExecutionStatus, + TaskQueue, +) +from maseval.core.task import TaskProtocol +from conftest import DummyBenchmark + + +# Canonical top-level keys every report must carry. +REPORT_KEYS = {"task_id", "repeat_idx", "status", "error", "traces", "config", "usage", "eval", "task"} +ERROR_KEYS = {"error_type", "error_message", "traceback"} + + +def assert_canonical_schema(report): + """Assert a single report carries the canonical schema.""" + missing = REPORT_KEYS - set(report.keys()) + assert not missing, f"report is missing keys: {missing}" + + # The ``task`` block is always present and structured. + assert isinstance(report["task"], dict) + assert {"query", "metadata", "protocol"} <= set(report["task"].keys()) + + # ``traces`` / ``config`` are always dicts (never absent / None). + assert isinstance(report["traces"], dict) + assert isinstance(report["config"], dict) + + # ``error`` is None exactly when the task succeeded; otherwise it is populated. + if report["status"] == TaskExecutionStatus.SUCCESS.value: + assert report["error"] is None + else: + assert report["error"] is not None, f"status={report['status']!r} but error is None" + assert ERROR_KEYS <= set(report["error"].keys()) + + +# -------------------------------------------------------------------------- +# Failure-injection benchmarks +# -------------------------------------------------------------------------- + + +class _FailingAgent: + def run(self, query: str) -> str: + raise RuntimeError("agent boom") + + +class _FailingAgentAdapter(AgentAdapter): + def _run_agent(self, query: str) -> str: + return self.agent.run(query) + + +class SetupFailureBenchmark(DummyBenchmark): + def setup_environment(self, agent_data, task, seed_generator): + raise RuntimeError("setup boom") + + +class SelectiveSetupFailureBenchmark(DummyBenchmark): + """Fails ``setup_environment`` only for tasks whose query starts with ``fail``.""" + + def setup_environment(self, agent_data, task, seed_generator): + if task.query.startswith("fail"): + raise RuntimeError("setup boom") + return super().setup_environment(agent_data, task, seed_generator) + + +class SetupTimeoutBenchmark(DummyBenchmark): + def setup_environment(self, agent_data, task, seed_generator): + time.sleep(0.05) + return super().setup_environment(agent_data, task, seed_generator) + + +class ExecutionFailureBenchmark(DummyBenchmark): + def setup_agents(self, agent_data, environment, task, user, seed_generator): + adapter = _FailingAgentAdapter(_FailingAgent(), "failing_agent") + return [adapter], {"failing_agent": adapter} + + +class _FailingEvaluator(Evaluator): + def filter_traces(self, traces): + return traces + + def __call__(self, traces, final_answer=None): + raise ValueError("eval boom") + + +class EvaluationFailureBenchmark(DummyBenchmark): + def setup_evaluators(self, environment, task, agents, user, seed_generator): + return [_FailingEvaluator(task, environment, user)] + + +class UnexpectedWorkerFailureBenchmark(DummyBenchmark): + """Simulates a failure escaping ``_execute_task_repetition``'s own handling. + + This is the only case ``_run_parallel`` should turn into a degraded report + on its own (and only when no ``fail_on_*`` flag is set). + """ + + def _execute_task_repetition(self, task, agent_data, repeat_idx): + if task.query == "boom": + raise RuntimeError("worker exploded") + return super()._execute_task_repetition(task, agent_data, repeat_idx) + + +# name -> (benchmark class, expected status, task timeout_seconds or None) +SCENARIOS = { + "success": (DummyBenchmark, TaskExecutionStatus.SUCCESS, None), + "setup_failure": (SetupFailureBenchmark, TaskExecutionStatus.SETUP_FAILED, None), + "setup_timeout": (SetupTimeoutBenchmark, TaskExecutionStatus.TASK_TIMEOUT, 0.001), + "execution_failure": (ExecutionFailureBenchmark, TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR, None), + "evaluation_failure": (EvaluationFailureBenchmark, TaskExecutionStatus.EVALUATION_FAILED, None), +} + + +def _one_task(timeout_seconds=None): + if timeout_seconds is None: + return TaskQueue.from_list([{"query": "q", "environment_data": {}}]) + return TaskQueue([Task(query="q", environment_data={}, protocol=TaskProtocol(timeout_seconds=timeout_seconds))]) + + +def _many_tasks(n=4): + return TaskQueue.from_list([{"query": f"q{i}", "environment_data": {}} for i in range(n)]) + + +# -------------------------------------------------------------------------- +# Schema-invariance tests +# -------------------------------------------------------------------------- + + +@pytest.mark.core +@pytest.mark.parametrize("num_workers", [1, 2], ids=["sequential", "parallel"]) +@pytest.mark.parametrize("scenario", list(SCENARIOS), ids=list(SCENARIOS)) +def test_report_schema_consistent_across_lifecycle_outcomes(scenario, num_workers): + """Reports carry the canonical schema regardless of where (or whether) a task failed.""" + benchmark_cls, expected_status, timeout = SCENARIOS[scenario] + benchmark = benchmark_cls(num_workers=num_workers) + + reports = benchmark.run(_one_task(timeout), agent_data={"model": "test"}) + + assert len(reports) == 1 + report = reports[0] + assert report["status"] == expected_status.value + assert_canonical_schema(report) + + +@pytest.mark.core +@pytest.mark.parametrize("num_workers", [1, 2], ids=["sequential", "parallel"]) +def test_mixed_outcomes_in_one_run_all_share_schema(num_workers): + """A run mixing successes and a (graceful) setup failure still yields uniform reports.""" + benchmark = SelectiveSetupFailureBenchmark(num_workers=num_workers) + tasks = TaskQueue.from_list( + [ + {"query": "ok1", "environment_data": {}}, + {"query": "fail-me", "environment_data": {}}, + {"query": "ok2", "environment_data": {}}, + ] + ) + + reports = benchmark.run(tasks, agent_data={"model": "test"}) + + assert len(reports) == 3 + for report in reports: + assert_canonical_schema(report) + + statuses = sorted(r["status"] for r in reports) + assert statuses == sorted( + [ + TaskExecutionStatus.SUCCESS.value, + TaskExecutionStatus.SETUP_FAILED.value, + TaskExecutionStatus.SUCCESS.value, + ] + ) + + +@pytest.mark.core +def test_parallel_fallback_unexpected_worker_failure_produces_full_schema_report(): + """When a worker raises unexpectedly (no fail_on_* set), the parallel runner records a full-schema report and carries on.""" + benchmark = UnexpectedWorkerFailureBenchmark(num_workers=2) + tasks = TaskQueue.from_list( + [ + {"query": "ok1", "environment_data": {}}, + {"query": "boom", "environment_data": {}}, + {"query": "ok2", "environment_data": {}}, + ] + ) + + reports = benchmark.run(tasks, agent_data={"model": "test"}) + + assert len(reports) == 3 + for report in reports: + assert_canonical_schema(report) + + boom_reports = [r for r in reports if r["status"] == TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR.value] + assert len(boom_reports) == 1 + assert "worker exploded" in boom_reports[0]["error"]["error_message"] + assert boom_reports[0]["error"]["error_type"] == "RuntimeError" + + +@pytest.mark.core +@pytest.mark.parametrize("num_workers", [1, 2], ids=["sequential", "parallel"]) +def test_error_always_populated_when_status_not_success(num_workers): + """Whenever a report's status is not SUCCESS, ``report["error"]`` is populated.""" + for benchmark_cls, expected_status, timeout in SCENARIOS.values(): + benchmark = benchmark_cls(num_workers=num_workers) + reports = benchmark.run(_one_task(timeout), agent_data={"model": "test"}) + report = reports[0] + if report["status"] == TaskExecutionStatus.SUCCESS.value: + assert report["error"] is None + else: + assert report["error"] is not None + assert report["error"]["error_message"] + + +# -------------------------------------------------------------------------- +# Parallel fail-fast tests (must match sequential semantics) +# -------------------------------------------------------------------------- + + +@pytest.mark.core +def test_parallel_run_aborts_on_fail_on_task_error(): + """fail_on_task_error=True aborts a parallel run instead of swallowing the failure.""" + benchmark = ExecutionFailureBenchmark(num_workers=2, fail_on_task_error=True) + with pytest.raises(RuntimeError, match="agent boom"): + benchmark.run(_many_tasks(), agent_data={"model": "test"}) + + +@pytest.mark.core +def test_parallel_run_aborts_on_fail_on_evaluation_error(): + """fail_on_evaluation_error=True aborts a parallel run instead of swallowing the failure.""" + benchmark = EvaluationFailureBenchmark(num_workers=2, fail_on_evaluation_error=True) + with pytest.raises(ValueError, match="eval boom"): + benchmark.run(_many_tasks(), agent_data={"model": "test"}) + + +@pytest.mark.core +def test_parallel_run_aborts_on_fail_on_setup_error(): + """fail_on_setup_error=True aborts a parallel run instead of swallowing the failure.""" + benchmark = SetupFailureBenchmark(num_workers=2, fail_on_setup_error=True) + with pytest.raises(RuntimeError, match="setup boom"): + benchmark.run(_many_tasks(), agent_data={"model": "test"}) + + +@pytest.mark.core +def test_parallel_unexpected_worker_failure_propagates_when_fail_fast(): + """An unexpected worker failure also aborts a parallel run when a fail_on_* flag is set.""" + benchmark = UnexpectedWorkerFailureBenchmark(num_workers=2, fail_on_task_error=True) + tasks = TaskQueue.from_list( + [ + {"query": "ok1", "environment_data": {}}, + {"query": "boom", "environment_data": {}}, + ] + ) + with pytest.raises(RuntimeError, match="worker exploded"): + benchmark.run(tasks, agent_data={"model": "test"}) + + +@pytest.mark.core +def test_parallel_graceful_failure_keeps_running_with_full_schema(): + """With fail_on_* unset, a failed task yields a full-schema report and the run continues.""" + benchmark = ExecutionFailureBenchmark(num_workers=2, fail_on_task_error=False) + reports = benchmark.run(_many_tasks(), agent_data={"model": "test"}) + + assert len(reports) == 4 + for report in reports: + assert_canonical_schema(report) + assert report["status"] == TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR.value From 89aca0029f88a09081d4a76c406bacf7f2cb8644 Mon Sep 17 00:00:00 2001 From: cemde <42615086+cemde@users.noreply.github.com> Date: Tue, 12 May 2026 12:12:13 +0200 Subject: [PATCH 2/2] added changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8398a22b..e333b1c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fixed MACS real-data tests passing `{"environment_data": task.environment_data}` instead of `task.environment_data` directly, which caused `setup_state` to silently receive an empty tools list. (PR: #58) +- Benchmark reports from `Benchmark.run()` now have a consistent schema across every outcome. Setup failures, setup timeouts, and unexpected worker failures in parallel runs previously produced reports missing the `usage` and `task` keys (with empty `traces`/`config`). Every report now always includes `task_id`, `repeat_idx`, `status`, `error`, `traces`, `config`, `usage`, `eval`, and `task`, and `report["error"]` is always populated whenever `status` is not `SUCCESS`. (PR: #61) +- `fail_on_setup_error`, `fail_on_task_error`, and `fail_on_evaluation_error` now abort a parallel `Benchmark.run()` the same way they abort a sequential run. Previously a parallel run swallowed the failure into a degraded report and kept going. (PR: #61) ### Removed