Skip to content

Fixes #28755: [Lineage] dedup queries and treat already-present query conflicts as warnings#28757

Open
mohittilala wants to merge 4 commits into
mainfrom
lineage-bulk-query-dup-conflicts
Open

Fixes #28755: [Lineage] dedup queries and treat already-present query conflicts as warnings#28757
mohittilala wants to merge 4 commits into
mainfrom
lineage-bulk-query-dup-conflicts

Conversation

@mohittilala
Copy link
Copy Markdown
Contributor

@mohittilala mohittilala commented Jun 5, 2026

Describe your changes:

Fixes #28755

I worked on the metadata-rest sink so a database lineage run stops going red over duplicate Query entities, because those conflicts lose no metadata. A lineage workflow stores each parsed SQL as a Query whose name = md5(query_text); identical SQL (e.g. a scheduled stored procedure running the same statement) produces identical checksums, which surface as two non-fatal bulk-Query failures that still fail the workflow:

  • Entity does not exist and could not be created — the same checksum sent multiple times in one bulk batch.
  • duplicate key value violates unique constraint "unique_query_checksum" — a checksum that already exists is re-inserted under overlapping/concurrent bulk requests.

Both changes are ingestion-side so they help users who cannot upgrade the server (the server-side idempotency fix #25890 only shipped in 1.13.0).

Type of change:

  • Bug fix

High-level design:

Queries get a dedicated bulk path in the sink so nothing query-specific leaks into the shared generic bulk methods. The generic path (write_create_request, _flush_buffer, _track_entity_in_buffer, _is_duplicate_in_buffer) is byte-identical to upstream main.

  • write_query dispatch handler (@_run_dispatch.register on CreateQueryRequest): queries are buffered in a dedicated query_buffer keyed on get_query_checksum(query) (they carry no name; the server derives it from the SQL checksum). Identical SQL is deduped here at the source, removing the within-batch duplicate (Error A).
  • _flush_query_buffer / _record_query_flush_result: bulk-create the query buffer; a failed response that matches a query_entity unique constraint (unique_query_checksum, query_entity_namehash_key, query_entity.nameHash; both engines) is an already-present query that lost no metadata, so it is recorded as status.warning() (not counted in Success %) instead of status.failed() (Error B). Any other failure is still a hard failure.
  • write_barrier and close flush the query buffer alongside the entity buffer.
  • Temporary: the duplicate-checksum suppression is gated behind a TODO referencing server fix Fix query duplicate throwing error in bulk apis #25890 (shipped in 1.13.0); it can be removed once 1.13.0 reaches all deployments. The checksum dedup is permanent.

Tests:

Use cases covered

  • A lineage run that re-ingests identical SQL (same query checksum) no longer reports Errors for the resulting duplicate-key bulk-Query conflicts.
  • Identical query text within one bulk batch is buffered once instead of N times.
  • A genuine bulk failure (e.g. validation error) is still reported as a failure.

Unit tests

  • I added unit tests for the new/changed logic.
  • Files added/updated:
    • ingestion/tests/unit/test_sink_deduplication.py — identical query text deduped to one entry in query_buffer (generic buffer untouched); different text not deduped.
    • ingestion/tests/unit/test_sink_bulk_conflict_warning.py — already-present-query conflicts (checksum + nameHash, both engines) become warnings (status green); genuine failures still recorded as failures; is_duplicate_query_conflict classifier behaviour.
  • Command: make unit_ingestion

Backend integration tests

  • Not applicable (no backend API changes).

Ingestion integration tests

  • Not applicable — behaviour is fully covered by the sink unit tests above; no connector protocol change.

Playwright (UI) tests

  • Not applicable (no UI changes).

Manual testing performed

  1. Reproduced both errors on a real 1.12.10 server via PUT /api/v1/queries/bulk with the same query text under different names: rowsFailed=2 (unique_query_checksum + Entity does not exist and could not be created).
  2. Confirmed a main/1.13 server swallows the duplicate-key (rowsFailed=0) via Fix query duplicate throwing error in bulk apis #25890, establishing the version boundary.
  3. Ran the sink unit suite: dedup + warning-downgrade tests pass, no regressions across existing sink tests.

UI screen recording / screenshots:

Not applicable.

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • My PR is linked to a GitHub issue via Fixes #<issue-number> above.
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.
  • For UI changes: I attached a screen recording and/or screenshots above.
  • I have added tests (unit / integration / Playwright as applicable) and listed them above.
  • I have added a test that covers the exact scenario we are fixing.

@mohittilala mohittilala requested a review from a team as a code owner June 5, 2026 12:33
Copilot AI review requested due to automatic review settings June 5, 2026 12:33
@mohittilala mohittilala self-assigned this Jun 5, 2026
@mohittilala mohittilala added python Pull requests that update python code safe to test Add this label to run secure Github workflows on PRs To release Will cherry-pick this PR into the release branch labels Jun 5, 2026
Comment thread ingestion/src/metadata/ingestion/sink/metadata_rest.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the ingestion-side MetadataRestSink to reduce false-negative lineage workflow failures caused by duplicate Query entities during bulk flushes, primarily on OpenMetadata server versions that don’t fully handle Query idempotency in bulk.

Changes:

  • Deduplicate CreateQueryRequest entries within the sink buffer using a checksum-based key (since Query “name” is derived server-side).
  • Reclassify selected bulk-flush conflicts as warnings (instead of failures) so runs don’t go red when the conflict is intended to be idempotent/benign.
  • Add unit tests covering query-buffer deduplication and the benign-conflict warning behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
ingestion/src/metadata/ingestion/sink/metadata_rest.py Adds Query-aware buffer dedup keys and downgrades certain bulk failures to warnings.
ingestion/tests/unit/test_sink_deduplication.py Adds tests verifying identical SQL Query requests are deduped within the buffer.
ingestion/tests/unit/test_sink_bulk_conflict_warning.py Adds tests verifying certain bulk conflict messages become warnings while real errors remain failures.

Comment on lines +136 to +140
BENIGN_BULK_CONFLICT_MARKERS = (
"duplicate key value violates unique constraint", # PostgreSQL unique_violation (23505)
"Duplicate entry", # MySQL ER_DUP_ENTRY (1062)
"Entity does not exist and could not be created", # within-batch duplicate whose create lost
)
Comment on lines +386 to +390
@staticmethod
def _is_benign_bulk_failure(message: str | None) -> bool:
"""A bulk failure is benign when the entity already exists or is a duplicate within
the same batch - no metadata is lost, so it must not fail the workflow."""
return bool(message) and any(marker in message for marker in BENIGN_BULK_CONFLICT_MARKERS)
@mohittilala mohittilala changed the title Fixes #28755: [Lineage] dedup queries in sink buffer; report benign bulk conflicts as warnings Fixes #28755: [Lineage] dedup queries and treat already-present query conflicts as warnings Jun 5, 2026
Comment thread ingestion/src/metadata/ingestion/sink/metadata_rest.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

🟡 Playwright Results — all passed (11 flaky)

✅ 4271 passed · ❌ 0 failed · 🟡 11 flaky · ⏭️ 88 skipped

Shard Passed Failed Flaky Skipped
✅ Shard 1 301 0 0 4
✅ Shard 2 804 0 0 9
🟡 Shard 3 805 0 2 8
🟡 Shard 4 853 0 2 12
🟡 Shard 5 720 0 1 47
🟡 Shard 6 788 0 6 8
🟡 11 flaky test(s) (passed on retry)
  • Features/RTL.spec.ts › Verify Following widget functionality (shard 3, 1 retry)
  • Flow/ExploreAggregationCountsMatching.spec.ts › should verify left panel counts and tab search results for normal search (shard 3, 1 retry)
  • Pages/CustomProperties.spec.ts › Hyperlink (shard 4, 1 retry)
  • Pages/CustomProperties.spec.ts › Should verify property name is visible for apiCollection in right panel (shard 4, 1 retry)
  • Pages/ExplorePageRightPanel_KnowledgeCenter.spec.ts › Should remove user owner for knowledgeCenter (shard 5, 1 retry)
  • Pages/Glossary.spec.ts › Column dropdown drag-and-drop functionality for Glossary Terms table (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageRightPanel.spec.ts › Verify custom properties tab IS visible for supported type: searchIndex (shard 6, 1 retry)
  • Pages/Lineage/PlatformLineage.spec.ts › Verify domain platform view (shard 6, 1 retry)
  • Pages/ODCSImportExport.spec.ts › Import ODCS with mustBeBetween quality rules (shard 6, 1 retry)
  • Pages/ServiceEntity.spec.ts › Tier Add, Update and Remove (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

Copilot AI review requested due to automatic review settings June 5, 2026 23:09
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment on lines +406 to +410
checksum = get_query_checksum(model_str(record.query))
result = Either(right=None) # pyright: ignore[reportCallIssue]
if checksum in self.buffered_query_checksums:
logger.debug(f"Skipping duplicate query with checksum {checksum}")
else:
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Jun 5, 2026

Code Review ✅ Approved 2 resolved / 2 findings

Introduces a dedicated bulk sink path to deduplicate queries and demote benign unique-constraint conflicts to warnings. This resolves the masking of genuine bulk failures and ensures consistent flush handling across all entity types.

✅ 2 resolved
Bug: Benign-conflict markers mask genuine failures for all bulk entity types

📄 ingestion/src/metadata/ingestion/sink/metadata_rest.py:136-140 📄 ingestion/src/metadata/ingestion/sink/metadata_rest.py:372-386 📄 ingestion/src/metadata/ingestion/sink/metadata_rest.py:247-261
_record_bulk_failure/_is_benign_bulk_failure are applied to every failedRequest in _flush_buffer, regardless of which entity type produced the conflict. The buffer is not Query-only — write_create_request routes many CreateRequest types (e.g. CreateDashboardDataModelRequest, tables, etc.) through the same bulk path. The marker "Entity does not exist and could not be created" is a generic server message that can legitimately indicate a real failure for a non-Query entity (e.g. a referenced parent/container could not be created), not just a within-batch duplicate. Because the classification is done purely by substring match against the message and is not scoped to CreateQueryRequest, such genuine failures are silently downgraded to status.warning(), so the workflow stays green and the error is hidden from the user.

Note that the within-batch duplicate case (the original source of the "Entity does not exist" message for queries) is now eliminated by the new buffer dedup, making reliance on this broad marker both risky and largely redundant. Consider scoping the benign downgrade to query-checksum conflicts (e.g. only when the response request maps to a CreateQueryRequest, or restrict to the duplicate-key/unique-constraint markers which are far more specific) and dropping or tightening the generic "Entity does not exist and could not be created" marker.

Bug: write_barrier drops query-flush Either when entity flush succeeds

📄 ingestion/src/metadata/ingestion/sink/metadata_rest.py:589-603
In write_barrier, the query-flush result is only adopted when the entity-buffer flush produced a neutral Either (result.right is None and result.left is None). Since a successful _flush_buffer() returns Either(right=<bulkresult>, left=None), result.right is not None, so when both buffers are flushed and the entity flush succeeds, the query_result (including any left produced by genuine non-benign query failures in _record_query_flush_result) is discarded from the barrier's return value.

Impact is limited because _record_query_flush_result already calls self.status.failed(...) for genuine failures, so the workflow Success% still reflects them. However, the barrier record's own Either no longer surfaces the query failure, making the return value inconsistent with the recorded status. Consider combining both results so a query failure is propagated even when the entity flush succeeded (e.g. prefer a left from either flush).

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Jun 6, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion python Pull requests that update python code safe to test Add this label to run secure Github workflows on PRs To release Will cherry-pick this PR into the release branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Lineage] Bulk Query create reports duplicate-key & "Entity does not exist" as errors and leads to failed lineage status on 1.12.x

2 participants