From 7afb9cd5b07430b91cbcfcecc15e06bd03d070c3 Mon Sep 17 00:00:00 2001 From: Poseidon Date: Mon, 18 May 2026 10:03:37 +0000 Subject: [PATCH] fix(exceptions): surface trace ID and underlying reason in FailedUpload FailedUpload now carries an optional trace_id, populated from RapidataError responses, and FailedUploadException's rendering prints it next to each item. Datapoint-level failures caused by asset upload errors now inherit the underlying asset errors' reasons (and trace IDs) instead of the opaque "One or more required assets failed to upload" placeholder. Co-Authored-By: Claude Opus 4.7 (1M context) Co-Authored-By: RapidPoseidon --- .../dataset/_rapidata_dataset.py | 92 +++++++++++++++++-- .../exceptions/failed_upload.py | 16 +++- .../exceptions/failed_upload_exception.py | 16 +++- 3 files changed, 113 insertions(+), 11 deletions(-) diff --git a/src/rapidata/rapidata_client/dataset/_rapidata_dataset.py b/src/rapidata/rapidata_client/dataset/_rapidata_dataset.py index 8fa56fbb0..6abd106fb 100644 --- a/src/rapidata/rapidata_client/dataset/_rapidata_dataset.py +++ b/src/rapidata/rapidata_client/dataset/_rapidata_dataset.py @@ -91,7 +91,7 @@ def add_datapoints( executor = ThreadPoolExecutor(max_workers=rapidata_config.upload.maxWorkers) # 3. Execute uploads and incremental datapoint creation - self._execute_incremental_creation( + asset_failures = self._execute_incremental_creation( datapoints, asset_to_datapoints, datapoint_pending_count, @@ -105,7 +105,12 @@ def add_datapoints( # 5. Collect and return results return self._collect_and_return_results( - datapoints, creation_futures, datapoint_pending_count, lock + datapoints, + creation_futures, + datapoint_pending_count, + lock, + asset_failures, + asset_to_datapoints, ) def _build_asset_to_datapoint_mapping( @@ -157,7 +162,7 @@ def _execute_incremental_creation( creation_futures: list[tuple[int, Future]], lock: threading.Lock, executor: ThreadPoolExecutor, - ) -> None: + ) -> list[FailedUpload[str]]: """ Execute asset uploads and incremental datapoint creation. @@ -168,7 +173,12 @@ def _execute_incremental_creation( creation_futures: List to store creation futures. lock: Lock protecting shared state. executor: Thread pool executor for datapoint creation. + + Returns: + Asset-level failures from the upload phase, so callers can map them + back to the datapoints they blocked. """ + asset_failures: list[FailedUpload[str]] = [] # Create progress bar for datapoint creation datapoint_pbar = tqdm( total=len(datapoints), @@ -225,6 +235,8 @@ def _execute_incremental_creation( # Always close progress bar, even on exception datapoint_pbar.close() + return asset_failures + def _create_asset_completion_callback( self, datapoints: list[Datapoint], @@ -369,6 +381,8 @@ def _collect_and_return_results( creation_futures: list[tuple[int, Future]], datapoint_pending_count: dict[int, int], lock: threading.Lock, + asset_failures: list[FailedUpload[str]], + asset_to_datapoints: dict[str, set[int]], ) -> tuple[list[Datapoint], list[FailedUpload[Datapoint]]]: """ Collect results from datapoint creation tasks. @@ -378,6 +392,10 @@ def _collect_and_return_results( creation_futures: List of creation futures. datapoint_pending_count: Datapoints whose assets failed. lock: Lock protecting datapoint_pending_count. + asset_failures: Asset-level failures from the upload phase, used to + attach the underlying error reasons and trace IDs to the + datapoint-level FailedUpload entries. + asset_to_datapoints: Mapping from asset to datapoint indices. Returns: Tuple of (successful_uploads, failed_uploads). @@ -395,15 +413,25 @@ def _collect_and_return_results( # Use from_exception to extract proper error reason from RapidataError failed_uploads.append(FailedUpload.from_exception(datapoints[idx], e)) + # Build reverse mapping: datapoint index -> the asset-level failures + # that blocked it. A single datapoint can have multiple required + # assets fail, so we accumulate all of them. + datapoint_to_asset_failures: dict[int, list[FailedUpload[str]]] = {} + for asset_failure in asset_failures: + affected = asset_to_datapoints.get(asset_failure.item, set()) + for dp_idx in affected: + datapoint_to_asset_failures.setdefault(dp_idx, []).append( + asset_failure + ) + # Handle datapoints whose assets failed to upload with lock: for idx in datapoint_pending_count: logger.warning(f"Datapoint {idx} assets failed to upload") failed_uploads.append( - FailedUpload( - item=datapoints[idx], - error_type="AssetUploadFailed", - error_message="One or more required assets failed to upload", + self._build_asset_failure_for_datapoint( + datapoints[idx], + datapoint_to_asset_failures.get(idx, []), ) ) @@ -412,6 +440,56 @@ def _collect_and_return_results( ) return successful_uploads, failed_uploads + @staticmethod + def _build_asset_failure_for_datapoint( + datapoint: Datapoint, + related_asset_failures: list[FailedUpload[str]], + ) -> FailedUpload[Datapoint]: + """ + Build a datapoint-level FailedUpload from the asset failures that + blocked it. + + The error message reuses the underlying asset errors' reasons so the + user sees the real cause (not just "assets failed"), and the trace IDs + from any RapidataError-backed failures are aggregated so each blocked + datapoint surfaces every backend trace involved. + """ + if not related_asset_failures: + return FailedUpload( + item=datapoint, + error_type="AssetUploadFailed", + error_message="One or more required assets failed to upload", + ) + + # Deduplicate reasons while preserving order so the message is stable. + seen_reasons: set[str] = set() + unique_reasons: list[str] = [] + for fu in related_asset_failures: + if fu.error_message not in seen_reasons: + seen_reasons.add(fu.error_message) + unique_reasons.append(fu.error_message) + + if len(unique_reasons) == 1: + error_message = f"Asset upload failed: {unique_reasons[0]}" + else: + error_message = "Asset upload failed: " + "; ".join(unique_reasons) + + seen_trace_ids: set[str] = set() + unique_trace_ids: list[str] = [] + for fu in related_asset_failures: + if fu.trace_id and fu.trace_id not in seen_trace_ids: + seen_trace_ids.add(fu.trace_id) + unique_trace_ids.append(fu.trace_id) + + trace_id = ", ".join(unique_trace_ids) if unique_trace_ids else None + + return FailedUpload( + item=datapoint, + error_type="AssetUploadFailed", + error_message=error_message, + trace_id=trace_id, + ) + def _create_dataset_groups(self, datapoints: list[Datapoint]) -> None: """Create dataset groups from datapoints that have a group field.""" from rapidata.api_client.models.create_dataset_group_endpoint_input import ( diff --git a/src/rapidata/rapidata_client/exceptions/failed_upload.py b/src/rapidata/rapidata_client/exceptions/failed_upload.py index 29b7ff910..6962488d7 100644 --- a/src/rapidata/rapidata_client/exceptions/failed_upload.py +++ b/src/rapidata/rapidata_client/exceptions/failed_upload.py @@ -20,6 +20,9 @@ class FailedUpload(Generic[T]): error_type: The type of the exception (e.g., "RapidataError"). timestamp: Optional timestamp when the failure occurred. exception: Optional original exception for richer error context. + trace_id: Optional backend trace ID, when the failure originated from a + RapidataError whose response included a traceId. Used to correlate + an SDK-side failure with the backend trace that produced it. """ item: T @@ -27,13 +30,15 @@ class FailedUpload(Generic[T]): error_type: str timestamp: Optional[datetime] = field(default_factory=datetime.now) exception: Optional[Exception] = None + trace_id: Optional[str] = None @classmethod def from_exception(cls, item: T, exception: Exception | None) -> FailedUpload[T]: """ Create a FailedUpload from an item and exception. - For RapidataError exceptions, extracts the clean API error reason. + For RapidataError exceptions, extracts the clean API error reason and + the backend trace ID (when present in the error response). For other exceptions, uses the string representation. Args: @@ -54,9 +59,14 @@ def from_exception(cls, item: T, exception: Exception | None) -> FailedUpload[T] from rapidata.rapidata_client.exceptions.rapidata_error import RapidataError error_type = type(exception).__name__ + trace_id: Optional[str] = None if isinstance(exception, RapidataError): error_message = exception.get_reason() + if isinstance(exception.details, dict): + raw_trace_id = exception.details.get("traceId") + if isinstance(raw_trace_id, str) and raw_trace_id: + trace_id = raw_trace_id else: error_message = str(exception) @@ -65,6 +75,7 @@ def from_exception(cls, item: T, exception: Exception | None) -> FailedUpload[T] error_message=error_message, error_type=error_type, exception=exception, + trace_id=trace_id, ) def format_error_details(self) -> str: @@ -80,6 +91,9 @@ def format_error_details(self) -> str: f"Error Message: {self.error_message}", ] + if self.trace_id: + details.append(f"Trace Id: {self.trace_id}") + if self.timestamp: details.append(f"Timestamp: {self.timestamp.isoformat()}") diff --git a/src/rapidata/rapidata_client/exceptions/failed_upload_exception.py b/src/rapidata/rapidata_client/exceptions/failed_upload_exception.py index 630d06b72..6da1456e7 100644 --- a/src/rapidata/rapidata_client/exceptions/failed_upload_exception.py +++ b/src/rapidata/rapidata_client/exceptions/failed_upload_exception.py @@ -68,10 +68,20 @@ def __str__(self) -> str: lines = [f"{total} datapoint(s) failed to upload:"] - for reason, datapoints in self.failures_by_reason.items(): + # Group internally on the full FailedUpload so each item can carry its + # own trace ID (the public failures_by_reason groups by reason only and + # discards that detail). + grouped: dict[str, list[FailedUpload[Datapoint]]] = defaultdict(list) + for fu in self._failed_uploads: + grouped[fu.error_message].append(fu) + + for reason, failures in grouped.items(): lines.append(f" '{reason}': [") - for dp in datapoints: - lines.append(f" {dp},") + for fu in failures: + if fu.trace_id: + lines.append(f" {fu.item} [trace_id={fu.trace_id}],") + else: + lines.append(f" {fu.item},") lines.append(" ]") failed_upload_message = "\n".join(lines)