Skip to content

fix(connectors): replace overloaded InvalidRecord with distinct error variants#3194

Open
atharvalade wants to merge 7 commits into
apache:masterfrom
atharvalade:fix/distinct-iceberg-sink-error-variants
Open

fix(connectors): replace overloaded InvalidRecord with distinct error variants#3194
atharvalade wants to merge 7 commits into
apache:masterfrom
atharvalade:fix/distinct-iceberg-sink-error-variants

Conversation

@atharvalade
Copy link
Copy Markdown
Contributor

@atharvalade atharvalade commented Apr 29, 2026

Which issue does this PR close?

Closes #3176

Rationale

Error::InvalidRecord was used for five unrelated failure modes in the Iceberg sink's write_data function, making it impossible for callers to distinguish schema mismatches from I/O failures from catalog outages.

What changed?

The Iceberg sink mapped Arrow schema conversion errors, Parquet write failures, and Iceberg catalog transaction failures all to Error::InvalidRecord. Callers could not programmatically decide whether to fix a table definition, skip a corrupt message, or retry a catalog outage.

Three new SDK error variants — SchemaMismatch(String), WriteFailure(String), CatalogError(String) — replace the overloaded InvalidRecord at the appropriate call sites. InvalidRecord is preserved only for the genuine record-batch deserialization error.

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

  • Opu 4.6
  • Writing comments, writing PR Description
  • Yes

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 29, 2026

Codecov Report

❌ Patch coverage is 0% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.53%. Comparing base (1ae123f) to head (e963fe4).
⚠️ Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
...re/connectors/sinks/iceberg_sink/src/router/mod.rs 0.00% 5 Missing ⚠️

❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3194       +/-   ##
=============================================
- Coverage     73.78%   56.53%   -17.25%     
  Complexity      943      943               
=============================================
  Files          1200     1199        -1     
  Lines        109094    96879    -12215     
  Branches      85994    73779    -12215     
=============================================
- Hits          80492    54770    -25722     
- Misses        25866    39520    +13654     
+ Partials       2736     2589      -147     
Components Coverage Δ
Rust Core 52.16% <0.00%> (-22.75%) ⬇️
Java SDK 58.44% <ø> (ø)
C# SDK 69.47% <ø> (ø)
Python SDK 81.43% <ø> (ø)
Node SDK 91.44% <ø> (ø)
Go SDK 39.80% <ø> (ø)
Files with missing lines Coverage Δ
core/connectors/sdk/src/lib.rs 56.17% <ø> (ø)
...re/connectors/sinks/iceberg_sink/src/router/mod.rs 39.23% <0.00%> (ø)

... and 272 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread core/connectors/sdk/src/lib.rs Outdated
Comment thread core/connectors/sdk/src/lib.rs Outdated
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added S-stale Inactive issue or pull request and removed S-stale Inactive issue or pull request labels May 13, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

/author

@github-actions github-actions Bot added the S-waiting-on-author PR is waiting on author response label May 14, 2026
@atharvalade
Copy link
Copy Markdown
Contributor Author

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-waiting-on-author PR is waiting on author response labels May 17, 2026
Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out-of-scope follow-ups (already tracked, do not block this PR):

  • the AutoCommitWhen::PollingMessages offset-before-consume data-loss path is already filed as issue #2928, with #2927 covering the paired bug (sink consume() return value discarded). PR #3180 fixes #2927; #2928 remains open. with the new typed variants in this PR, every error path through write_data lands on that same lost-offset hazard, so worth flagging on #2928 that PR 3194 increases its visibility.
  • core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs:145-165 runs write_data per table; first failure aborts, earlier tables already committed via tx.commit(), remaining tables stay unwritten, offset advances once for the whole batch. iceberg Transaction is single-table so structural fix needs a separate design.
  • no tests added for the new variant mappings. would be good to cover the 5 map_err sites and the early-return-from-batch-loop path.
  • same overloading exists in core/connectors/sources/postgres_source/src/lib.rs (24 Error::InvalidRecord sites mixing sqlx I/O, row.try_get column decode, simd_json serialization, and type-cast schema-mismatch). issue #3176 was scoped to iceberg, so it's reasonable that this PR is too -- but please file a follow-up issue mirroring #3176 for postgres_source so the same taxonomy split can land there.

the variant split itself is a reasonable refactor and worth landing once the doc claims are softened and line 196 is addressed.

writer.write(batch_data).await.map_err(|err| {
error!("Error while writing record batch: {}", err);
Error::InvalidRecord
Error::WriteFailure(err.to_string())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two problems in this batch loop (lines 193-201).

first: line 196 still maps to Error::InvalidRecord (no payload). the PR's stated goal is to replace the overloaded InvalidRecord, but this site is left as the unit variant and loses the underlying arrow_json reader error message. change to Error::InvalidRecordValue(err.to_string()) so the diagnostic survives.

second: an error at line 196 OR 198-201 returns via ? without calling writer.close(). DataFileWriter has no Drop impl, so any RecordBatches already buffered in the writer are silently dropped, and any partially flushed Parquet bytes that already went to file_io become orphaned (you also can't recover the Vec<DataFile> metadata without close()). combined with runtime/src/sink.rs:467 AutoCommitWhen::PollingMessages (offset commits at poll time, before consume runs), the messages whose batches were buffered are lost on restart. wrap the loop so writer.close() runs on early-return (scope guard / explicit close before propagating), or accept the leak and document it.

let data_files = writer.close().await.map_err(|err| {
error!("Error while writing data records to Parquet file: {}", err);
Error::InvalidRecord
Error::WriteFailure(err.to_string())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer.close() failure here can leave a partially-flushed Parquet file on the object store with no cleanup, because the Vec<DataFile> is never produced and no transaction commit happens. the WriteFailure name reads as "transient I/O, safe to retry" but retry creates more orphans (each call generates a fresh uuid path at line 119). either add a best-effort cleanup before propagating, or document on the WriteFailure variant that it may leave orphaned files and cleanup is consumer-side.

/// A catalog commit failed over the network (e.g. REST catalog I/O,
/// conflict). Transient failures may be retried.
#[error("Catalog commit error: {0}")]
CatalogCommitError(String),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"transient failures may be retried" is dangerously thin guidance. iceberg::transaction::Transaction::commit(self, &dyn Catalog) consumes self, so a naive retry isn't even possible without rebuilding the transaction + data files. and even then there's no idempotency token: if the REST catalog applied the commit but the response was lost, a rebuilt retry would add the same data files into a second snapshot -> duplicate rows.

suggest rewriting to something like: "a catalog commit failed. Transaction is consumed by commit(), so retrying requires rebuilding the transaction from new data files. retry is not idempotent: callers must verify via the catalog whether the original commit was applied before retrying, otherwise data may be duplicated."

/// In-memory transaction preparation failed (e.g. invalid partition spec,
/// schema validation). These failures are deterministic and not retryable.
#[error("Transaction apply error: {0}")]
TransactionApplyError(String),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"deterministic and not retryable" is too absolute. iceberg's Error carries a retryable() flag and the apply step interacts with partition spec / schema state that this code doesn't statically classify. better wording: "typically deterministic in the current iceberg version; check Error::retryable() on the underlying iceberg error to decide." or just drop the retryability hint and let the caller route based on the inner string.

#[error("Permanent HTTP error: {0}")]
PermanentHttpError(String),
/// The source schema could not be mapped to the destination schema.
/// Indicates a table definition or configuration problem
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing terminal period to match PermanentHttpError style at line 425. the other three new doc comments already end with periods, just this one.

err
);
Error::InvalidRecord
Error::CatalogCommitError(err.to_string())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err.to_string() here (and at the four sister sites: lines 181, 200, 206, 219) only renders the top-level Display impl and drops the iceberg Error::source() chain. underlying parquet / arrow / io causes get swallowed - the new typed variant tells you which stage failed but the payload string is no more informative than before.

the whole stated win of distinct variants is better diagnostics, so it's worth wiring this through. one option: walk err.source() manually and join the chain into the string. another: log the chain alongside the existing error! line. format!("{err:#}") only works for anyhow, not thiserror, so don't reach for that.

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels May 20, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 20, 2026

@EdgarModesto23 you might also be interested in this PR because it touches iceberg connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Error::InvalidRecord is overloaded across unrelated failure modes

3 participants