fix(connectors): Add unwrap_envelope transform and envelope detection to fix source-sink format mismatch#3197
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3197 +/- ##
=============================================
- Coverage 74.18% 56.61% -17.58%
Complexity 943 943
=============================================
Files 1200 1201 +1
Lines 109645 97136 -12509
Branches 86534 74025 -12509
=============================================
- Hits 81345 54989 -26356
- Misses 25553 39564 +14011
+ Partials 2747 2583 -164
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
@atharvalade please rebase this PR /author |
8defa26 to
7c92361
Compare
done |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
review notes (not posted as line comments because targets are out-of-diff or already tracked):
-
the FFI
(consume)return code is still discarded atcore/connectors/runtime/src/sink.rs:642-650(ConsumeCallbackreturnsi32,sdk/src/sink.rs:28-36) andAutoCommit::When(AutoCommitWhen::PollingMessages)atcore/connectors/runtime/src/sink.rs:467commits offsets before the sink runs. this PR widens the blast radius: any envelope-detected batch nowreturn Err(Error::InvalidRecord)from the sink, gets logged, and the offsets have already advanced. tracked in #2927 and #2928 but this PR adds a brand new fail path triggerable purely by source/sink misconfiguration, so they should land together. -
core/connectors/sinks/delta_sink/src/sink.rs:106has the same envelope-vs-flat-JSON mismatch and no detection or unwrap guidance. fixing one sink and leaving the other broken means #3174 is still reachable through the postgres → delta path. either replicate the detection (with the tighter shape suggested below) or, better, documentunwrap_envelopeas the single fix and drop sink-side sniffing entirely. -
neither
core/connectors/sdk/README.mdnorcore/connectors/sinks/iceberg_sink/README.mdmention the newunwrap_envelopetransform or the source-compatibility requirement forpostgres_source→ iceberg. discoverability is the whole point of this fix; please add at least a short entry to both. -
architecturally, sink-side envelope sniffing is the wrong layer. transform discipline + docs is the real fix. if sink-side detection stays, it should be opt-in via config, not a hardcoded postgres-shaped heuristic.
overall, i'm not fan of this PR.
| return false; | ||
| }; | ||
| obj.contains_key("table_name") && obj.contains_key("data") | ||
| } |
There was a problem hiding this comment.
this heuristic checks 2 of the 5 keys on DatabaseRecord (table_name, operation_type, timestamp, data, old_data - see core/connectors/sources/postgres_source/src/lib.rs:110-116).
false positives: any legit iceberg table modeling audit logs, catalogs, or CDC metadata that happens to have table_name + data columns gets the whole batch rejected.
false negatives: Debezium / Kafka-Connect envelopes use before / after / op / source / payload and slip right through into JsonArrowReader, which then writes nulls - the exact bug #3174 is supposed to fix.
also it bakes a postgres-source shape into a generic iceberg sink. options: tighten to the full 4-or-5 key shape, or move detection behind an opt-in detect_envelope config flag, or drop sink-side sniffing entirely and rely on unwrap_envelope + docs.
There was a problem hiding this comment.
Removed entirely, relying on transform and docs now
| .collect(); | ||
|
|
||
| if let Some(first) = msgs.first() | ||
| && looks_like_envelope(first) |
There was a problem hiding this comment.
the sniff only looks at msgs.first(). mixed batch where the first message is flat and the rest are envelopes (or vice versa) skips detection entirely - envelope rows then hit JsonArrowReader and write nulls, which is the silent-corruption mode #3174 was opened for. envelope-first drops the whole batch including any valid flat rows.
either scan all messages, or document and enforce a batch-homogeneity invariant somewhere upstream.
There was a problem hiding this comment.
Dropped sink-side detection completely
| 'unwrap_envelope' transform with field = \"data\" to your \ | ||
| connector config to extract the inner payload." | ||
| ); | ||
| return Err(Error::InvalidRecord); |
There was a problem hiding this comment.
Error::InvalidRecord is a unit variant; the actionable hint ("add an unwrap_envelope transform with field = data") lives only in the error! log and is lost the moment the caller receives the error. Error::InvalidRecordValue(String) already exists (sdk/src/lib.rs:389) and is used in delta_sink and influxdb_source - switch to it and carry the hint in the variant.
this is also a concrete instance of #3176 (overloaded InvalidRecord).
| (detected 'table_name' + 'data' fields). The Iceberg sink \ | ||
| expects flat JSON matching the target table schema. Add an \ | ||
| 'unwrap_envelope' transform with field = \"data\" to your \ | ||
| connector config to extract the inner payload." |
There was a problem hiding this comment.
two issues with this error message:
-
it hardcodes
field = "data", which only matches the postgres envelope shape. if/when detection broadens to Debezium (after) or other shapes, the suggestion is wrong. -
no connector id / plugin id in the log line, so in a multi-tenant deployment with several iceberg sinks you cannot tell which one rejected the batch.
| /// entire payload with the contents of `data`. | ||
| #[derive(Debug, Serialize, Deserialize)] | ||
| pub struct UnwrapEnvelopeConfig { | ||
| pub field: String, |
There was a problem hiding this comment.
UnwrapEnvelopeConfig.field: String has no validation. an empty string deserializes fine through from_config at transforms/mod.rs:137-141, then every message hits the missing-field branch and warn-spams at message rate (compounds the log flood on the json side).
reject empty field either via a custom Deserialize impl, or inline in UnwrapEnvelope::new, or in the from_config arm.
There was a problem hiding this comment.
new() now rejects empty field with InvalidConfigValue
| } | ||
| TransformType::UnwrapEnvelope => { | ||
| let cfg: UnwrapEnvelopeConfig = | ||
| serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?; |
There was a problem hiding this comment.
.map_err(|_| Error::InvalidConfig) throws away the serde error detail. for a user with a typo in field or a wrong type, all they see is InvalidConfig - no line number, no field name, no expected-vs-got. this is a pre-existing pattern in every arm of from_config, but this PR adds another instance.
worth either fixing all arms in a follow-up, or at least propagating the serde error through Error::InvalidConfigDetail(String) or similar.
There was a problem hiding this comment.
Now propagates detail via InvalidConfigValue(format!(...))
|
/ready |
| field = "data" | ||
| ``` | ||
|
|
||
| Alternatively, configure the Postgres source with `payload_column` and `payload_format = "json_direct"` to bypass the envelope entirely — see the Postgres source README for details. |
There was a problem hiding this comment.
come on, don't leave postgres reference in iceberg readme. go thru all changes in this PR and make sure that all docs (readmes, comments, etc) are aligned with the actual code and placement in the project.
Which issue does this PR close?
Closes #3174
Rationale
Sources (e.g. Postgres) wrap row data in a
DatabaseRecordenvelope while sinks (e.g. Iceberg) expect flat JSON matching the target table schema — no shared contract exists, producing silent null failures.What changed?
The Postgres source emits
{table_name, operation_type, timestamp, data: {...}, old_data}envelopes, but the Iceberg sink's Arrow JSON reader maps these nested structures to top-level fields as null, silently violating non-nullable constraints.This adds a reusable
unwrap_envelopetransform to the connector SDK that extracts a nested field (e.g.data) and promotes it as the top-level payload, plus explicit envelope detection in the Iceberg sink that errors with an actionable message instead of failing silently.Local Execution
AI Usage