Skip to content

Add upsert support for offline tables#17789

Merged
xiangfu0 merged 1 commit intoapache:masterfrom
xiangfu0:feature/offline-table-upsert-support
Mar 8, 2026
Merged

Add upsert support for offline tables#17789
xiangfu0 merged 1 commit intoapache:masterfrom
xiangfu0:feature/offline-table-upsert-support

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Mar 1, 2026

Summary

Extends Apache Pinot's upsert (primary-key deduplication) to OFFLINE tables, a capability previously limited to REALTIME tables. This enables batch-ingested data to leverage primary-key-based deduplication with the same semantics.

Motivation

Users with batch data pipelines need deduplication on primary keys across segment uploads. Previously this required a REALTIME table with segment push, which adds unnecessary streaming infrastructure overhead. Offline upsert allows a simpler batch-only architecture while preserving upsert guarantees.

Key design decisions

  • Three-level comparison column fallback: configured comparison columns → time column → segment creation/push time. This ensures offline tables without an explicit comparison column still have deterministic conflict resolution.
  • Partition-aware segment assignment is required: offline upsert tables must configure segmentPartitionConfig so all segments of a partition land on the same server. This is validated at table creation time.
  • Shared upsert logic in base class: the upsert lifecycle methods were duplicated between RealtimeTableDataManager and the new OfflineTableDataManager. These are now consolidated into BaseTableDataManager, and TableDataManager interface methods replace instanceof checks.

Changes

Core — Offline upsert support

File Change
OfflineTableDataManager Full upsert lifecycle: init, addSegment, shutdown with stop-before-close ordering
BaseTableUpsertMetadataManager Three-level comparison column fallback (configured → time column → empty list for segment-creation-time mode)
UpsertUtils Add ConstantComparisonColumnReader for segment-creation-time-based comparison
UpsertContext Allow empty comparison columns (non-null check only)
BasePartitionUpsertMetadataManager createRecordInfoReader() helper with segment creation time fallback; guard TTL paths against empty comparison columns; prefer zkPushTime for offline tables in getAuthoritativeUpdateOrCreationTime()
SegmentMetadataImpl Add zkPushTime field for offline upsert consistency

Refactoring — Shared upsert logic

File Change
BaseTableDataManager Pull up handleUpsert, replaceUpsertSegment, registerSegment, setZkOperationTimeIfAvailable, getSegmentContexts, isUpsertEnabled, getPartitionToPrimaryKeyCount from RealtimeTableDataManager
RealtimeTableDataManager Remove duplicated upsert methods (now inherited from base class); remove _tableUpsertMetadataManager field (now in base)
TableDataManager Add isUpsertEnabled(), getTableUpsertMetadataManager(), getPartitionToPrimaryKeyCount() with default implementations
SingleTableExecutionInfo Use tableDataManager.isUpsertEnabled() instead of instanceof RealtimeTableDataManager; add null-safety Preconditions.checkState
PrimaryKeyCount Use tableDataManager.getPartitionToPrimaryKeyCount() instead of instanceof cast
TablesResource Same simplification as PrimaryKeyCount

Validation

File Change
TableConfigUtils Remove REALTIME-only restriction; require segmentPartitionConfig for offline upsert; scope tenant tag override and COMPLETED instance partition checks to REALTIME only; validate TTL requires comparison column or time column
TableConfigUtilsTest Update expected error messages for new validation rules

Testing

File Change
OfflineUpsertTableIntegrationTest End-to-end integration test: dedup query results, skipUpsert option, segment replacement with updated records

Configuration example

{
  "tableName": "myTable_OFFLINE",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "timestampInEpoch",
    "replicaGroupStrategyConfig": {
      "partitionColumn": "userId",
      "numInstancesPerPartition": 1
    }
  },
  "tableIndexConfig": {
    "segmentPartitionConfig": {
      "columnPartitionMap": {
        "userId": { "functionName": "Murmur", "numPartitions": 4 }
      }
    }
  },
  "upsertConfig": { "mode": "FULL" },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  }
}

Test plan

  • TableConfigUtilsTest — 48 tests pass (updated expected error messages for OFFLINE tables)
  • BasePartitionUpsertMetadataManagerTest — 14 tests pass
  • ConcurrentMapPartitionUpsertMetadataManagerTest — 24 tests pass
  • ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest — 12 tests pass
  • OfflineUpsertTableIntegrationTest — new integration test validates end-to-end offline upsert (dedup query results, segment replacement, skipUpsert option)
  • Verified no behavior change on RT side — shared methods are identical to the removed RT private methods

🤖 Generated with Claude Code

@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch from 5449717 to a966e7f Compare March 1, 2026 05:18
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 1, 2026

Codecov Report

❌ Patch coverage is 26.62338% with 113 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.25%. Comparing base (7eec9a7) to head (e549738).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/core/data/manager/BaseTableDataManager.java 1.49% 63 Missing and 3 partials ⚠️
.../data/manager/offline/OfflineTableDataManager.java 30.00% 9 Missing and 5 partials ⚠️
...cal/upsert/BasePartitionUpsertMetadataManager.java 50.00% 3 Missing and 5 partials ⚠️
...he/pinot/segment/local/utils/TableConfigUtils.java 46.66% 1 Missing and 7 partials ⚠️
.../core/query/executor/SingleTableExecutionInfo.java 0.00% 3 Missing and 1 partial ⚠️
...egment/spi/index/metadata/SegmentMetadataImpl.java 0.00% 4 Missing ⚠️
...t/segment/local/data/manager/TableDataManager.java 0.00% 3 Missing ⚠️
...apache/pinot/segment/local/upsert/UpsertUtils.java 75.00% 2 Missing and 1 partial ⚠️
...t/local/upsert/BaseTableUpsertMetadataManager.java 50.00% 1 Missing and 1 partial ⚠️
...ache/pinot/segment/local/upsert/UpsertContext.java 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17789      +/-   ##
============================================
- Coverage     63.29%   63.25%   -0.05%     
  Complexity     1466     1466              
============================================
  Files          3190     3190              
  Lines        191904   191963      +59     
  Branches      29390    29410      +20     
============================================
- Hits         121474   121428      -46     
- Misses        60940    61027      +87     
- Partials       9490     9508      +18     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.22% <26.62%> (-0.03%) ⬇️
java-21 63.22% <26.62%> (-0.04%) ⬇️
temurin 63.25% <26.62%> (-0.05%) ⬇️
unittests 63.25% <26.62%> (-0.05%) ⬇️
unittests1 55.56% <6.00%> (-0.05%) ⬇️
unittests2 34.25% <26.62%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Extends Pinot’s upsert (primary-key deduplication) feature to OFFLINE tables, enabling batch-ingested segments to participate in upsert semantics, including comparison-column fallback logic and server-side introspection APIs.

Changes:

  • Enable/validate upsert & dedup configurations for OFFLINE tables and allow empty comparison columns (fallback behavior).
  • Add comparison fallback down to segment creation time and wire segment-context/upsert-metadata paths for offline querying.
  • Extend server APIs/metrics paths to surface primary-key counts for offline upsert tables and add an offline upsert integration test.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java Relaxes REALTIME-only validation and adjusts validations for OFFLINE upsert/dedup.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java Adds comparison-column fallback: configured → time column → empty (segment creation time).
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java Allows empty comparison-column list (non-null requirement only).
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java Adds constant comparison-value reader to support segment-creation-time comparison.
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java Creates RecordInfoReader with segment creation time when comparison columns are empty; adjusts TTL gating.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java Implements upsert lifecycle for offline tables (init/add/replace/segment contexts/shutdown).
pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java Includes offline upsert tables in the “newly added segments” and segment-context path.
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java Exposes offline partition→PK-count in table metadata response.
pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java Counts PKs for offline upsert tables on the server instance.
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java Updates tests for relaxed OFFLINE validation and updated error messages.
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java Adds end-to-end integration test coverage for offline upsert.

@xiangfu0 xiangfu0 requested a review from Copilot March 1, 2026 09:32
@xiangfu0 xiangfu0 added feature upsert Related to upsert functionality labels Mar 1, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

@rohityadav1993
Copy link
Copy Markdown
Contributor

Do we intend to provide Hybrid tables with upsert?

Have had this idea but we never really had a strong requirement at Uber for Offline upsert tables.
e.g. for batch ingested data, it is generally an offline data source. Offline data sources are generally not updated and incase we have updates to our offline data we can re-ingest and replace the existing segment with deterministic naming in an offline table.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Mar 4, 2026

Do we intend to provide Hybrid tables with upsert?

Have had this idea but we never really had a strong requirement at Uber for Offline upsert tables. e.g. for batch ingested data, it is generally an offline data source. Offline data sources are generally not updated and incase we have updates to our offline data we can re-ingest and replace the existing segment with deterministic naming in an offline table.

I don't think we need a hybrid upsert table. Technically it's equivalent to the realtime upsert table, since you can push segments to realtime table.

Copy link
Copy Markdown
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are unifying the behavior of RT and OFFLINE table. Let's pull out the common part into base class, and common API into interface

@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch 3 times, most recently from 69acf78 to 350fc37 Compare March 6, 2026 19:05
Copy link
Copy Markdown
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please do another round of check to ensure no behavior change on RT side

@xiangfu0 xiangfu0 requested a review from Copilot March 7, 2026 05:26
@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch from 350fc37 to a7c7457 Compare March 7, 2026 05:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (1)

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:370

  • TTL handling is only partially guarded for the empty-comparison-columns (segment-creation-time) mode. While doAddSegment()/doPreloadSegment()/doReplaceSegment() now skip TTL when _comparisonColumns is empty, removeSegment() still calls isOutOfMetadataTTL(segment) which unconditionally dereferences _comparisonColumns.get(0) via getMaxComparisonValue(). This can throw at runtime for TTL-enabled tables with empty comparison columns. Either disallow TTL when comparison columns are empty at validation time, or update TTL-related helpers (including isOutOfMetadataTTL(IndexSegment)) to handle the empty-comparison-columns mode safely.
    if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
      double maxComparisonValue = getMaxComparisonValue(segment);
      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
      if (isOutOfMetadataTTL(maxComparisonValue) && skipAddSegmentOutOfTTL(segment)) {
        return;

@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch 5 times, most recently from 44276bb to 9263180 Compare March 8, 2026 02:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 3 comments.

@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch 2 times, most recently from d2d2e69 to 7b06e42 Compare March 8, 2026 03:34
@xiangfu0 xiangfu0 requested a review from Copilot March 8, 2026 03:36
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 1 comment.

Enable primary-key-based deduplication (upsert) for OFFLINE tables,
extending a capability previously limited to REALTIME tables. This
allows batch-ingested data to leverage the same upsert semantics.

Core changes:
- Add three-level comparison column fallback for resolving record
  conflicts: configured comparison columns → time column → segment
  creation/push time (via ConstantComparisonColumnReader)
- Extend OfflineTableDataManager with full upsert lifecycle: init,
  addSegment, replaceSegment, getSegmentContexts, and shutdown
- Validate that offline upsert tables have SegmentPartitionConfig
  to ensure partition-aware segment assignment
- Use zkMetadata-based partition ID lookup when available to avoid
  redundant ZK reads
- Guard TTL code paths against empty comparison columns to prevent
  IndexOutOfBoundsException in segment-creation-time mode

Refactoring:
- Move shared upsert methods (handleUpsert, replaceUpsertSegment,
  registerSegment, setZkOperationTimeIfAvailable, getSegmentContexts,
  isUpsertEnabled, getPartitionToPrimaryKeyCount) from
  RealtimeTableDataManager into BaseTableDataManager
- Add isUpsertEnabled(), getTableUpsertMetadataManager(), and
  getPartitionToPrimaryKeyCount() to the TableDataManager interface
  with default implementations
- Remove instanceof checks in PrimaryKeyCount, TablesResource, and
  SingleTableExecutionInfo in favor of interface methods

Testing:
- Add OfflineUpsertTableIntegrationTest covering dedup query results,
  skipUpsert option, and segment replacement
- Update TableConfigUtilsTest for new validation rules

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@xiangfu0 xiangfu0 force-pushed the feature/offline-table-upsert-support branch from 7b06e42 to e549738 Compare March 8, 2026 03:51
@xiangfu0 xiangfu0 requested a review from Copilot March 8, 2026 04:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 merged commit 50ad111 into apache:master Mar 8, 2026
35 of 36 checks passed
@xiangfu0 xiangfu0 deleted the feature/offline-table-upsert-support branch March 8, 2026 06:36
@xiangfu0 xiangfu0 added ingestion Related to data ingestion pipeline dedup Changes related to realtime ingestion dedup handling real-time Related to realtime table ingestion and serving labels Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dedup Changes related to realtime ingestion dedup handling ingestion Related to data ingestion pipeline real-time Related to realtime table ingestion and serving upsert Related to upsert functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants