Fixes #28755: [Lineage] dedup queries and treat already-present query conflicts as warnings#28757
Fixes #28755: [Lineage] dedup queries and treat already-present query conflicts as warnings#28757mohittilala wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the ingestion-side MetadataRestSink to reduce false-negative lineage workflow failures caused by duplicate Query entities during bulk flushes, primarily on OpenMetadata server versions that don’t fully handle Query idempotency in bulk.
Changes:
- Deduplicate
CreateQueryRequestentries within the sink buffer using a checksum-based key (since Query “name” is derived server-side). - Reclassify selected bulk-flush conflicts as warnings (instead of failures) so runs don’t go red when the conflict is intended to be idempotent/benign.
- Add unit tests covering query-buffer deduplication and the benign-conflict warning behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
ingestion/src/metadata/ingestion/sink/metadata_rest.py |
Adds Query-aware buffer dedup keys and downgrades certain bulk failures to warnings. |
ingestion/tests/unit/test_sink_deduplication.py |
Adds tests verifying identical SQL Query requests are deduped within the buffer. |
ingestion/tests/unit/test_sink_bulk_conflict_warning.py |
Adds tests verifying certain bulk conflict messages become warnings while real errors remain failures. |
| BENIGN_BULK_CONFLICT_MARKERS = ( | ||
| "duplicate key value violates unique constraint", # PostgreSQL unique_violation (23505) | ||
| "Duplicate entry", # MySQL ER_DUP_ENTRY (1062) | ||
| "Entity does not exist and could not be created", # within-batch duplicate whose create lost | ||
| ) |
| @staticmethod | ||
| def _is_benign_bulk_failure(message: str | None) -> bool: | ||
| """A bulk failure is benign when the entity already exists or is a duplicate within | ||
| the same batch - no metadata is lost, so it must not fail the workflow.""" | ||
| return bool(message) and any(marker in message for marker in BENIGN_BULK_CONFLICT_MARKERS) |
🟡 Playwright Results — all passed (11 flaky)✅ 4271 passed · ❌ 0 failed · 🟡 11 flaky · ⏭️ 88 skipped
🟡 11 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
| checksum = get_query_checksum(model_str(record.query)) | ||
| result = Either(right=None) # pyright: ignore[reportCallIssue] | ||
| if checksum in self.buffered_query_checksums: | ||
| logger.debug(f"Skipping duplicate query with checksum {checksum}") | ||
| else: |
Code Review ✅ Approved 2 resolved / 2 findingsIntroduces a dedicated bulk sink path to deduplicate queries and demote benign unique-constraint conflicts to warnings. This resolves the masking of genuine bulk failures and ensures consistent flush handling across all entity types. ✅ 2 resolved✅ Bug: Benign-conflict markers mask genuine failures for all bulk entity types
✅ Bug: write_barrier drops query-flush Either when entity flush succeeds
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|



Describe your changes:
Fixes #28755
I worked on the metadata-rest sink so a database lineage run stops going red over duplicate Query entities, because those conflicts lose no metadata. A lineage workflow stores each parsed SQL as a Query whose
name = md5(query_text); identical SQL (e.g. a scheduled stored procedure running the same statement) produces identical checksums, which surface as two non-fatal bulk-Query failures that still fail the workflow:Entity does not exist and could not be created— the same checksum sent multiple times in one bulk batch.duplicate key value violates unique constraint "unique_query_checksum"— a checksum that already exists is re-inserted under overlapping/concurrent bulk requests.Both changes are ingestion-side so they help users who cannot upgrade the server (the server-side idempotency fix #25890 only shipped in 1.13.0).
Type of change:
High-level design:
Queries get a dedicated bulk path in the sink so nothing query-specific leaks into the shared generic bulk methods. The generic path (
write_create_request,_flush_buffer,_track_entity_in_buffer,_is_duplicate_in_buffer) is byte-identical to upstreammain.write_querydispatch handler (@_run_dispatch.registeronCreateQueryRequest): queries are buffered in a dedicatedquery_bufferkeyed onget_query_checksum(query)(they carry noname; the server derives it from the SQL checksum). Identical SQL is deduped here at the source, removing the within-batch duplicate (Error A)._flush_query_buffer/_record_query_flush_result: bulk-create the query buffer; a failed response that matches aquery_entityunique constraint (unique_query_checksum,query_entity_namehash_key,query_entity.nameHash; both engines) is an already-present query that lost no metadata, so it is recorded asstatus.warning()(not counted inSuccess %) instead ofstatus.failed()(Error B). Any other failure is still a hard failure.write_barrierandcloseflush the query buffer alongside the entity buffer.TODOreferencing server fix Fix query duplicate throwing error in bulk apis #25890 (shipped in 1.13.0); it can be removed once 1.13.0 reaches all deployments. The checksum dedup is permanent.Tests:
Use cases covered
Errorsfor the resulting duplicate-key bulk-Query conflicts.Unit tests
ingestion/tests/unit/test_sink_deduplication.py— identical query text deduped to one entry inquery_buffer(genericbufferuntouched); different text not deduped.ingestion/tests/unit/test_sink_bulk_conflict_warning.py— already-present-query conflicts (checksum + nameHash, both engines) become warnings (status green); genuine failures still recorded as failures;is_duplicate_query_conflictclassifier behaviour.make unit_ingestionBackend integration tests
Ingestion integration tests
Playwright (UI) tests
Manual testing performed
PUT /api/v1/queries/bulkwith the same query text under different names:rowsFailed=2(unique_query_checksum+Entity does not exist and could not be created).rowsFailed=0) via Fix query duplicate throwing error in bulk apis #25890, establishing the version boundary.UI screen recording / screenshots:
Not applicable.
Checklist:
Fixes <issue-number>: <short explanation>Fixes #<issue-number>above.