Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Fixed MACS real-data tests passing `{"environment_data": task.environment_data}` instead of `task.environment_data` directly, which caused `setup_state` to silently receive an empty tools list. (PR: #58)
- Benchmark reports from `Benchmark.run()` now have a consistent schema across every outcome. Setup failures, setup timeouts, and unexpected worker failures in parallel runs previously produced reports missing the `usage` and `task` keys (with empty `traces`/`config`). Every report now always includes `task_id`, `repeat_idx`, `status`, `error`, `traces`, `config`, `usage`, `eval`, and `task`, and `report["error"]` is always populated whenever `status` is not `SUCCESS`. (PR: #61)
- `fail_on_setup_error`, `fail_on_task_error`, and `fail_on_evaluation_error` now abort a parallel `Benchmark.run()` the same way they abort a sequential run. Previously a parallel run swallowed the failure into a degraded report and kept going. (PR: #61)

### Removed

Expand Down
155 changes: 105 additions & 50 deletions maseval/core/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,64 @@ def _invoke_callbacks(self, method_name: str, *args, suppress_errors: bool = Tru

return errors

def _build_report(
self,
task: Task,
repeat_idx: int,
status: TaskExecutionStatus,
*,
error: Optional[Dict[str, Any]] = None,
traces: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None,
usage: Optional[Dict[str, Any]] = None,
eval_results: Any = None,
) -> Dict[str, Any]:
"""Build a task-repetition report with the canonical schema.

Every report — success or failure, from setup, execution or evaluation,
and from sequential or parallel runs — carries the same top-level keys so
downstream consumers can rely on a stable schema. ``error`` is ``None``
only for ``SUCCESS``; for any other status it is always populated (a
generic placeholder is synthesised if a caller forgot to supply one).

Args:
task: The task this report describes.
repeat_idx: Repetition index (0 to n_task_repeats-1).
status: Final execution status for this repetition.
error: Error details dict (``error_type``/``error_message``/``traceback``,
plus any extra fields), or ``None`` if the repetition succeeded.
traces: Collected execution traces (defaults to ``{}`` when not available).
config: Collected component/benchmark configuration (defaults to ``{}``).
usage: Collected usage totals, or ``None`` if not collected.
eval_results: Evaluation results, or ``None`` if evaluation was skipped or failed.

Returns:
Report dictionary with keys: ``task_id``, ``repeat_idx``, ``status``,
``error``, ``traces``, ``config``, ``usage``, ``eval``, ``task``.
"""
if status is not TaskExecutionStatus.SUCCESS and error is None:
# Defensive: a non-success report must always carry error details.
error = {
"error_type": "UnknownError",
"error_message": f"Task ended with status '{status.value}' but no error details were recorded.",
"traceback": "",
}
return {
"task_id": str(task.id),
"repeat_idx": repeat_idx,
"status": status.value,
"error": error,
"traces": traces if traces is not None else {},
"config": config if config is not None else {},
"usage": usage,
"eval": eval_results,
"task": {
"query": task.query,
"metadata": dict(task.metadata),
"protocol": task.protocol.to_dict(),
},
}

def _append_report_safe(self, report: Dict[str, Any]) -> None:
"""Append a report to the reports list (thread-safe).

Expand Down Expand Up @@ -1089,16 +1147,14 @@ def _execute_task_repetition(
"traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)),
}

# Create a minimal report for this timeout
report = {
"task_id": str(task.id),
"repeat_idx": repeat_idx,
"status": execution_status.value,
"error": error_info,
"traces": e.partial_traces,
"config": {},
"eval": None,
}
# Create a minimal report for this timeout (canonical schema)
report = self._build_report(
task,
repeat_idx,
execution_status,
error=error_info,
traces=e.partial_traces,
)
self.clear_registry()
return report

Expand All @@ -1111,22 +1167,13 @@ def _execute_task_repetition(
"traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)),
}

# Create a minimal report for this failed setup
report = {
"task_id": str(task.id),
"repeat_idx": repeat_idx,
"status": execution_status.value,
"error": error_info,
"traces": {},
"config": {},
"eval": None,
}
self.clear_registry()

if self.fail_on_setup_error:
raise

return report
# Create a minimal report for this failed setup (canonical schema)
return self._build_report(task, repeat_idx, execution_status, error=error_info)

# 2. Execute agent system with optional user interaction loop
try:
Expand Down Expand Up @@ -1265,21 +1312,16 @@ def _execute_task_repetition(
eval_results = None

# 5. Build report — all keys always present for consistent schema
report: Dict[str, Any] = {
"task_id": str(task.id),
"repeat_idx": repeat_idx,
"status": execution_status.value,
"error": error_info,
"traces": execution_traces,
"config": execution_configs,
"usage": execution_usage,
"eval": eval_results,
"task": {
"query": task.query,
"metadata": dict(task.metadata),
"protocol": task.protocol.to_dict(),
},
}
report = self._build_report(
task,
repeat_idx,
execution_status,
error=error_info,
traces=execution_traces,
config=execution_configs,
usage=execution_usage,
eval_results=eval_results,
)

# Clear registry after task repetition completes
self.clear_registry()
Expand Down Expand Up @@ -1391,20 +1433,26 @@ def submit_task_repeats(task: Task) -> None:
try:
report = future.result()
except Exception as e:
# Create error report for unexpected failures
report = {
"task_id": task_id,
"repeat_idx": repeat_idx,
"status": TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR.value,
"error": {
# A deliberate fail-fast re-raise from _execute_task_repetition
# (fail_on_setup_error / fail_on_task_error / fail_on_evaluation_error)
# must abort the parallel run, matching sequential semantics — rather
# than being swallowed into a degraded report and letting the run continue.
if self.fail_on_setup_error or self.fail_on_task_error or self.fail_on_evaluation_error:
executor.shutdown(wait=False, cancel_futures=True)
raise
# Otherwise this is an unexpected failure inside the worker itself
# (not handled by _execute_task_repetition) — record it with the
# canonical report schema and carry on with the remaining tasks.
report = self._build_report(
task,
repeat_idx,
TaskExecutionStatus.UNKNOWN_EXECUTION_ERROR,
error={
"error_type": type(e).__name__,
"error_message": str(e),
"traceback": "".join(traceback.format_exception(type(e), e, e.__traceback__)),
},
"traces": {},
"config": {},
"eval": None,
}
)

self._append_report_safe(report)

Expand Down Expand Up @@ -1447,20 +1495,27 @@ def run(
model parameters, agent architecture details, and tool specifications.

Returns:
List of report dictionaries, one per task repetition. Each report contains:
List of report dictionaries, one per task repetition. Every report carries the
same keys (consistent schema) regardless of success or failure:
- task_id: Task identifier (UUID)
- repeat_idx: Repetition index (0 to n_task_repeats-1)
- status: Execution status (one of TaskExecutionStatus enum values)
- traces: Execution traces from all registered components
- config: Configuration from all registered components and benchmark level
- traces: Execution traces from all registered components (``{}`` if unavailable, e.g. setup failure)
- config: Configuration from all registered components and benchmark level (``{}`` if unavailable)
- usage: Aggregated usage from all registered components (``None`` if not collected)
- eval: Evaluation results (None if task or evaluation failed)
- error: Error details dict (only present if status is not SUCCESS), containing:
- task: Task summary dict with ``query``, ``metadata``, and ``protocol``
- error: Error details dict — ``None`` only when status is SUCCESS; otherwise always populated, containing:
- error_type: Exception class name
- error_message: Exception message
- traceback: Full traceback string
- (plus any error-specific extras, e.g. ``component``, ``elapsed``, ``timeout``)

Raises:
ValueError: If agent_data length doesn't match number of tasks (when agent_data is an iterable).
Exception: If a ``fail_on_setup_error`` / ``fail_on_task_error`` / ``fail_on_evaluation_error``
flag is set and the corresponding failure occurs, the original exception is re-raised
and the run is aborted (this applies to both sequential and parallel execution).

How to use:
This is the framework's main orchestration method that runs your entire benchmark. It
Expand Down
Loading
Loading