Skip to content
57 changes: 18 additions & 39 deletions backend/app/api/routes/stt_evaluations/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

import logging

from asgi_correlation_id import correlation_id
from fastapi import APIRouter, Body, Depends, HTTPException, Query

from app.api.deps import AuthContextDep, SessionDep
from app.api.permissions import Permission, require_permission
from app.celery.utils import start_low_priority_job
from app.crud.stt_evaluations import (
create_stt_run,
create_stt_results,
get_results_by_run_id,
get_samples_by_dataset_id,
get_stt_dataset_by_id,
get_stt_run_by_id,
list_stt_runs,
start_stt_evaluation_batch,
update_stt_run,
)
from app.models.stt_evaluation import (
Expand Down Expand Up @@ -80,56 +79,36 @@ def start_stt_evaluation(
total_items=sample_count * len(run_create.models),
)

# Get samples for the dataset
samples = get_samples_by_dataset_id(
session=_session,
dataset_id=run_create.dataset_id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

# Create result records for each sample and model
create_stt_results(
session=_session,
samples=samples,
evaluation_run_id=run.id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
models=run_create.models,
)

# Offload batch submission (signed URLs, JSONL, Gemini upload) to Celery worker
trace_id = correlation_id.get() or "N/A"
try:
batch_result = start_stt_evaluation_batch(
session=_session,
run=run,
samples=samples,
org_id=auth_context.organization_.id,
celery_task_id = start_low_priority_job(
function_path="app.services.stt_evaluations.batch_job.execute_batch_submission",
project_id=auth_context.project_.id,
job_id=str(run.id),
trace_id=trace_id,
organization_id=auth_context.organization_.id,
dataset_id=run_create.dataset_id,
)
logger.info(
f"[start_stt_evaluation] STT evaluation batch submitted | "
f"run_id: {run.id}, batch_jobs: {list(batch_result.get('batch_jobs', {}).keys())}"
f"[start_stt_evaluation] Batch submission queued | "
f"run_id: {run.id}, celery_task_id: {celery_task_id}"
)
except Exception as e:
logger.error(
f"[start_stt_evaluation] Batch submission failed | "
f"[start_stt_evaluation] Failed to queue batch submission | "
f"run_id: {run.id}, error: {str(e)}"
)
update_stt_run(
session=_session,
run_id=run.id,
status="failed",
error_message=str(e),
error_message=f"Failed to queue batch submission: {str(e)}",
)
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
Comment on lines +108 to 111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid leaking internal exception details in the HTTP response.

detail=f"Failed to queue batch submission: {e}" exposes internal error information to the API client. Use a generic message instead; the detailed error is already logged on line 99.

Suggested fix
         raise HTTPException(
             status_code=500,
-            detail=f"Failed to queue batch submission: {e}",
+            detail="Failed to start evaluation. Please try again later.",
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
raise HTTPException(
status_code=500,
detail="Failed to start evaluation. Please try again later.",
)
🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 108 - 111,
The HTTPException currently returns internal exception text via detail=f"Failed
to queue batch submission: {e}"; change this to a generic message (e.g.
detail="Failed to queue batch submission") so internal error details are not
leaked to clients, keep the status_code=500, and ensure the original exception
is still logged (the existing logger call earlier in the surrounding function in
evaluation.py should remain unchanged); locate the raise HTTPException(...) in
the batch submission handler in evaluation.py and remove the interpolated
exception from the detail string.

raise HTTPException(status_code=500, detail=f"Batch submission failed: {e}")

# Refresh run to get updated status
run = get_stt_run_by_id(
session=_session,
run_id=run.id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

return APIResponse.success_response(
data=STTEvaluationRunPublic(
Expand Down
12 changes: 0 additions & 12 deletions backend/app/api/routes/stt_evaluations/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ def update_result_feedback(
f"result_id: {result_id}, is_correct: {feedback.is_correct}"
)

# Verify result exists and belongs to this project
existing = get_stt_result_by_id(
session=_session,
result_id=result_id,
org_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
)

if not existing:
raise HTTPException(status_code=404, detail="Result not found")

# Update feedback
result = update_human_feedback(
session=_session,
result_id=result_id,
Expand Down
3 changes: 2 additions & 1 deletion backend/app/core/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Batch processing infrastructure for LLM providers."""

from .base import BatchProvider
from .base import BATCH_KEY, BatchProvider
from .gemini import BatchJobState, GeminiBatchProvider, create_stt_batch_requests
from .openai import OpenAIBatchProvider
from .operations import (
Expand All @@ -12,6 +12,7 @@
from .polling import poll_batch_status

__all__ = [
"BATCH_KEY",
"BatchProvider",
"BatchJobState",
"GeminiBatchProvider",
Expand Down
6 changes: 5 additions & 1 deletion backend/app/core/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from abc import ABC, abstractmethod
from typing import Any

# Unified key used across all batch providers to identify individual requests/responses.
# OpenAI uses "custom_id" natively; Gemini uses "key" but we normalize to this constant.
BATCH_KEY = "custom_id"


class BatchProvider(ABC):
"""Abstract base class for LLM batch providers (OpenAI, Anthropic, etc.)."""
Expand Down Expand Up @@ -61,7 +65,7 @@ def download_batch_results(self, output_file_id: str) -> list[dict[str, Any]]:

Returns:
List of result dictionaries, each containing:
- custom_id: Item identifier from input
- BATCH_KEY: Item identifier from input
- response: Provider's response data
- error: Error info (if item failed)
- Any other provider-specific result data
Expand Down
8 changes: 4 additions & 4 deletions backend/app/core/batch/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from app.core.storage_utils import get_mime_from_url

from .base import BatchProvider
from .base import BATCH_KEY, BatchProvider

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -188,7 +188,7 @@ def download_batch_results(

Returns:
List of result dictionaries, each containing:
- custom_id: Item key from input
- BATCH_KEY: Item key from input
- response: Dict with "text" key containing the generated text
- error: Error info (if item failed), None otherwise
"""
Expand Down Expand Up @@ -225,15 +225,15 @@ def download_batch_results(
text = self._extract_text_from_response_dict(response_obj)
results.append(
{
"custom_id": custom_id,
BATCH_KEY: custom_id,
"response": {"text": text},
"error": None,
}
)
elif parsed.get("error"):
results.append(
{
"custom_id": custom_id,
BATCH_KEY: custom_id,
"response": None,
"error": str(parsed["error"]),
}
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/batch/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def download_batch_results(self, output_file_id: str) -> list[dict[str, Any]]:

Returns:
List of result dictionaries, each containing:
- custom_id: Item identifier from input
- BATCH_KEY: Item identifier from input
- response: OpenAI response data (body, status_code, request_id)
- error: Error info (if item failed)

Expand Down
6 changes: 4 additions & 2 deletions backend/app/crud/evaluations/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from openai import OpenAI
from sqlmodel import Session

from app.core.batch.base import BATCH_KEY

from app.core.batch import OpenAIBatchProvider, start_batch_job
from app.models import EvaluationRun
from app.models.llm.request import KaapiLLMParams
Expand Down Expand Up @@ -66,7 +68,7 @@ def build_evaluation_jsonl(
Build JSONL data for evaluation batch using OpenAI Responses API.

Each line is a dict with:
- custom_id: Unique identifier for the request (dataset item ID)
- BATCH_KEY: Unique identifier for the request (dataset item ID)
- method: POST
- url: /v1/responses
- body: Response request using config as-is with input from dataset
Expand Down Expand Up @@ -124,7 +126,7 @@ def build_evaluation_jsonl(
]

batch_request = {
"custom_id": item["id"],
BATCH_KEY: item["id"],
"method": "POST",
"url": "/v1/responses",
"body": body,
Expand Down
13 changes: 7 additions & 6 deletions backend/app/crud/evaluations/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sqlmodel import Session

from app.core.batch import OpenAIBatchProvider, start_batch_job
from app.core.batch.base import BATCH_KEY
from app.core.util import now
from app.models import EvaluationRun

Expand Down Expand Up @@ -58,7 +59,7 @@ def build_embedding_jsonl(
Build JSONL data for embedding batch using OpenAI Embeddings API.

Each line is a dict with:
- custom_id: Langfuse trace_id (for direct score updates)
- BATCH_KEY: Langfuse trace_id (for direct score updates)
- method: POST
- url: /v1/embeddings
- body: Embedding request with input array [output, ground_truth]
Expand Down Expand Up @@ -110,9 +111,9 @@ def build_embedding_jsonl(
continue

# Build the batch request object for Embeddings API
# Use trace_id as custom_id for direct score updates
# Use trace_id as BATCH_KEY for direct score updates
batch_request = {
"custom_id": trace_id,
BATCH_KEY: trace_id,
"method": "POST",
"url": "/v1/embeddings",
"body": {
Expand Down Expand Up @@ -155,10 +156,10 @@ def parse_embedding_results(raw_results: list[dict[str, Any]]) -> list[dict[str,

for line_num, response in enumerate(raw_results, 1):
try:
# Extract custom_id (which is now the Langfuse trace_id)
trace_id = response.get("custom_id")
# Extract BATCH_KEY (which is now the Langfuse trace_id)
trace_id = response.get(BATCH_KEY)
if not trace_id:
logger.warning(f"Line {line_num}: No custom_id found, skipping")
logger.warning(f"Line {line_num}: No {BATCH_KEY} found, skipping")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing [function_name] log prefix — violates coding guideline.

The changed log message at line 162 is missing the required [parse_embedding_results] prefix.

🛠️ Proposed fix
-                logger.warning(f"Line {line_num}: No {BATCH_KEY} found, skipping")
+                logger.warning(f"[parse_embedding_results] Line {line_num}: No {BATCH_KEY} found, skipping")

As per coding guidelines: "Prefix all log messages with the function name in square brackets: logger.info(f"[function_name] Message ...")".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.warning(f"Line {line_num}: No {BATCH_KEY} found, skipping")
logger.warning(f"[parse_embedding_results] Line {line_num}: No {BATCH_KEY} found, skipping")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/crud/evaluations/embeddings.py` at line 162, The log call in
parse_embedding_results currently uses logger.warning(f"Line {line_num}: No
{BATCH_KEY} found, skipping") and violates the guideline requiring a
function-name prefix; update that call to include the prefix
"[parse_embedding_results]" at the start of the message so it becomes
logger.warning(f"[parse_embedding_results] Line {line_num}: No {BATCH_KEY}
found, skipping"), preserving the existing variables (line_num, BATCH_KEY) and
message text.

continue

# Handle errors in batch processing
Expand Down
7 changes: 4 additions & 3 deletions backend/app/crud/evaluations/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
poll_batch_status,
upload_batch_results_to_object_store,
)
from app.core.batch.base import BATCH_KEY
from app.crud.evaluations.batch import fetch_dataset_items
from app.crud.evaluations.core import update_evaluation_run, resolve_model_from_config
from app.crud.evaluations.embeddings import (
Expand Down Expand Up @@ -81,11 +82,11 @@ def parse_evaluation_output(

for line_num, response in enumerate(raw_results, 1):
try:
# Extract custom_id (which is our dataset item ID)
item_id = response.get("custom_id")
# Extract BATCH_KEY (which is our dataset item ID)
item_id = response.get(BATCH_KEY)
if not item_id:
logger.warning(
f"[parse_evaluation_output] No custom_id found, skipping | line={line_num}"
f"[parse_evaluation_output] No {BATCH_KEY} found, skipping | line={line_num}"
)
continue

Expand Down
2 changes: 0 additions & 2 deletions backend/app/crud/stt_evaluations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
update_stt_run,
)
from .result import (
create_stt_results,
get_stt_result_by_id,
get_results_by_run_id,
update_human_feedback,
Expand All @@ -39,7 +38,6 @@
"list_stt_runs",
"update_stt_run",
# Result
"create_stt_results",
"get_stt_result_by_id",
"get_results_by_run_id",
"update_human_feedback",
Expand Down
Loading