fix(connectors): replace overloaded InvalidRecord with distinct error variants#3194
fix(connectors): replace overloaded InvalidRecord with distinct error variants#3194atharvalade wants to merge 7 commits into
Conversation
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #3194 +/- ##
=============================================
- Coverage 73.78% 56.53% -17.25%
Complexity 943 943
=============================================
Files 1200 1199 -1
Lines 109094 96879 -12215
Branches 85994 73779 -12215
=============================================
- Hits 80492 54770 -25722
- Misses 25866 39520 +13654
+ Partials 2736 2589 -147
🚀 New features to boost your workflow:
|
Removed redundant phrasing from SchemaMismatch error documentation.
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
/author |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
out-of-scope follow-ups (already tracked, do not block this PR):
- the
AutoCommitWhen::PollingMessagesoffset-before-consume data-loss path is already filed as issue #2928, with #2927 covering the paired bug (sinkconsume()return value discarded). PR #3180 fixes #2927; #2928 remains open. with the new typed variants in this PR, every error path throughwrite_datalands on that same lost-offset hazard, so worth flagging on #2928 that PR 3194 increases its visibility. core/connectors/sinks/iceberg_sink/src/router/dynamic_router.rs:145-165runswrite_dataper table; first failure aborts, earlier tables already committed viatx.commit(), remaining tables stay unwritten, offset advances once for the whole batch. icebergTransactionis single-table so structural fix needs a separate design.- no tests added for the new variant mappings. would be good to cover the 5 map_err sites and the early-return-from-batch-loop path.
- same overloading exists in
core/connectors/sources/postgres_source/src/lib.rs(24Error::InvalidRecordsites mixing sqlx I/O,row.try_getcolumn decode,simd_jsonserialization, and type-cast schema-mismatch). issue #3176 was scoped to iceberg, so it's reasonable that this PR is too -- but please file a follow-up issue mirroring #3176 for postgres_source so the same taxonomy split can land there.
the variant split itself is a reasonable refactor and worth landing once the doc claims are softened and line 196 is addressed.
| writer.write(batch_data).await.map_err(|err| { | ||
| error!("Error while writing record batch: {}", err); | ||
| Error::InvalidRecord | ||
| Error::WriteFailure(err.to_string()) |
There was a problem hiding this comment.
two problems in this batch loop (lines 193-201).
first: line 196 still maps to Error::InvalidRecord (no payload). the PR's stated goal is to replace the overloaded InvalidRecord, but this site is left as the unit variant and loses the underlying arrow_json reader error message. change to Error::InvalidRecordValue(err.to_string()) so the diagnostic survives.
second: an error at line 196 OR 198-201 returns via ? without calling writer.close(). DataFileWriter has no Drop impl, so any RecordBatches already buffered in the writer are silently dropped, and any partially flushed Parquet bytes that already went to file_io become orphaned (you also can't recover the Vec<DataFile> metadata without close()). combined with runtime/src/sink.rs:467 AutoCommitWhen::PollingMessages (offset commits at poll time, before consume runs), the messages whose batches were buffered are lost on restart. wrap the loop so writer.close() runs on early-return (scope guard / explicit close before propagating), or accept the leak and document it.
| let data_files = writer.close().await.map_err(|err| { | ||
| error!("Error while writing data records to Parquet file: {}", err); | ||
| Error::InvalidRecord | ||
| Error::WriteFailure(err.to_string()) |
There was a problem hiding this comment.
writer.close() failure here can leave a partially-flushed Parquet file on the object store with no cleanup, because the Vec<DataFile> is never produced and no transaction commit happens. the WriteFailure name reads as "transient I/O, safe to retry" but retry creates more orphans (each call generates a fresh uuid path at line 119). either add a best-effort cleanup before propagating, or document on the WriteFailure variant that it may leave orphaned files and cleanup is consumer-side.
| /// A catalog commit failed over the network (e.g. REST catalog I/O, | ||
| /// conflict). Transient failures may be retried. | ||
| #[error("Catalog commit error: {0}")] | ||
| CatalogCommitError(String), |
There was a problem hiding this comment.
"transient failures may be retried" is dangerously thin guidance. iceberg::transaction::Transaction::commit(self, &dyn Catalog) consumes self, so a naive retry isn't even possible without rebuilding the transaction + data files. and even then there's no idempotency token: if the REST catalog applied the commit but the response was lost, a rebuilt retry would add the same data files into a second snapshot -> duplicate rows.
suggest rewriting to something like: "a catalog commit failed. Transaction is consumed by commit(), so retrying requires rebuilding the transaction from new data files. retry is not idempotent: callers must verify via the catalog whether the original commit was applied before retrying, otherwise data may be duplicated."
| /// In-memory transaction preparation failed (e.g. invalid partition spec, | ||
| /// schema validation). These failures are deterministic and not retryable. | ||
| #[error("Transaction apply error: {0}")] | ||
| TransactionApplyError(String), |
There was a problem hiding this comment.
"deterministic and not retryable" is too absolute. iceberg's Error carries a retryable() flag and the apply step interacts with partition spec / schema state that this code doesn't statically classify. better wording: "typically deterministic in the current iceberg version; check Error::retryable() on the underlying iceberg error to decide." or just drop the retryability hint and let the caller route based on the inner string.
| #[error("Permanent HTTP error: {0}")] | ||
| PermanentHttpError(String), | ||
| /// The source schema could not be mapped to the destination schema. | ||
| /// Indicates a table definition or configuration problem |
There was a problem hiding this comment.
missing terminal period to match PermanentHttpError style at line 425. the other three new doc comments already end with periods, just this one.
| err | ||
| ); | ||
| Error::InvalidRecord | ||
| Error::CatalogCommitError(err.to_string()) |
There was a problem hiding this comment.
err.to_string() here (and at the four sister sites: lines 181, 200, 206, 219) only renders the top-level Display impl and drops the iceberg Error::source() chain. underlying parquet / arrow / io causes get swallowed - the new typed variant tells you which stage failed but the payload string is no more informative than before.
the whole stated win of distinct variants is better diagnostics, so it's worth wiring this through. one option: walk err.source() manually and join the chain into the string. another: log the chain alongside the existing error! line. format!("{err:#}") only works for anyhow, not thiserror, so don't reach for that.
|
@EdgarModesto23 you might also be interested in this PR because it touches iceberg connector. |
Which issue does this PR close?
Closes #3176
Rationale
Error::InvalidRecordwas used for five unrelated failure modes in the Iceberg sink'swrite_datafunction, making it impossible for callers to distinguish schema mismatches from I/O failures from catalog outages.What changed?
The Iceberg sink mapped Arrow schema conversion errors, Parquet write failures, and Iceberg catalog transaction failures all to
Error::InvalidRecord. Callers could not programmatically decide whether to fix a table definition, skip a corrupt message, or retry a catalog outage.Three new SDK error variants —
SchemaMismatch(String),WriteFailure(String),CatalogError(String)— replace the overloadedInvalidRecordat the appropriate call sites.InvalidRecordis preserved only for the genuine record-batch deserialization error.Local Execution
AI Usage