Add upsert support for offline tables#17789
Conversation
5449717 to
a966e7f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17789 +/- ##
============================================
- Coverage 63.29% 63.25% -0.05%
Complexity 1466 1466
============================================
Files 3190 3190
Lines 191904 191963 +59
Branches 29390 29410 +20
============================================
- Hits 121474 121428 -46
- Misses 60940 61027 +87
- Partials 9490 9508 +18
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Extends Pinot’s upsert (primary-key deduplication) feature to OFFLINE tables, enabling batch-ingested segments to participate in upsert semantics, including comparison-column fallback logic and server-side introspection APIs.
Changes:
- Enable/validate upsert & dedup configurations for OFFLINE tables and allow empty comparison columns (fallback behavior).
- Add comparison fallback down to segment creation time and wire segment-context/upsert-metadata paths for offline querying.
- Extend server APIs/metrics paths to surface primary-key counts for offline upsert tables and add an offline upsert integration test.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java |
Relaxes REALTIME-only validation and adjusts validations for OFFLINE upsert/dedup. |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java |
Adds comparison-column fallback: configured → time column → empty (segment creation time). |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java |
Allows empty comparison-column list (non-null requirement only). |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java |
Adds constant comparison-value reader to support segment-creation-time comparison. |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java |
Creates RecordInfoReader with segment creation time when comparison columns are empty; adjusts TTL gating. |
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java |
Implements upsert lifecycle for offline tables (init/add/replace/segment contexts/shutdown). |
pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java |
Includes offline upsert tables in the “newly added segments” and segment-context path. |
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java |
Exposes offline partition→PK-count in table metadata response. |
pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java |
Counts PKs for offline upsert tables on the server instance. |
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java |
Updates tests for relaxed OFFLINE validation and updated error messages. |
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java |
Adds end-to-end integration test coverage for offline upsert. |
|
Do we intend to provide Hybrid tables with upsert? Have had this idea but we never really had a strong requirement at Uber for Offline upsert tables. |
I don't think we need a hybrid upsert table. Technically it's equivalent to the realtime upsert table, since you can push segments to realtime table. |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
We are unifying the behavior of RT and OFFLINE table. Let's pull out the common part into base class, and common API into interface
69acf78 to
350fc37
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
LGTM. Please do another round of check to ensure no behavior change on RT side
350fc37 to
a7c7457
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:370
- TTL handling is only partially guarded for the empty-comparison-columns (segment-creation-time) mode. While
doAddSegment()/doPreloadSegment()/doReplaceSegment()now skip TTL when_comparisonColumnsis empty,removeSegment()still callsisOutOfMetadataTTL(segment)which unconditionally dereferences_comparisonColumns.get(0)viagetMaxComparisonValue(). This can throw at runtime for TTL-enabled tables with empty comparison columns. Either disallow TTL when comparison columns are empty at validation time, or update TTL-related helpers (includingisOutOfMetadataTTL(IndexSegment)) to handle the empty-comparison-columns mode safely.
if (isTTLEnabled() && !_comparisonColumns.isEmpty()) {
double maxComparisonValue = getMaxComparisonValue(segment);
_largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue));
if (isOutOfMetadataTTL(maxComparisonValue) && skipAddSegmentOutOfTTL(segment)) {
return;
44276bb to
9263180
Compare
d2d2e69 to
7b06e42
Compare
Enable primary-key-based deduplication (upsert) for OFFLINE tables, extending a capability previously limited to REALTIME tables. This allows batch-ingested data to leverage the same upsert semantics. Core changes: - Add three-level comparison column fallback for resolving record conflicts: configured comparison columns → time column → segment creation/push time (via ConstantComparisonColumnReader) - Extend OfflineTableDataManager with full upsert lifecycle: init, addSegment, replaceSegment, getSegmentContexts, and shutdown - Validate that offline upsert tables have SegmentPartitionConfig to ensure partition-aware segment assignment - Use zkMetadata-based partition ID lookup when available to avoid redundant ZK reads - Guard TTL code paths against empty comparison columns to prevent IndexOutOfBoundsException in segment-creation-time mode Refactoring: - Move shared upsert methods (handleUpsert, replaceUpsertSegment, registerSegment, setZkOperationTimeIfAvailable, getSegmentContexts, isUpsertEnabled, getPartitionToPrimaryKeyCount) from RealtimeTableDataManager into BaseTableDataManager - Add isUpsertEnabled(), getTableUpsertMetadataManager(), and getPartitionToPrimaryKeyCount() to the TableDataManager interface with default implementations - Remove instanceof checks in PrimaryKeyCount, TablesResource, and SingleTableExecutionInfo in favor of interface methods Testing: - Add OfflineUpsertTableIntegrationTest covering dedup query results, skipUpsert option, and segment replacement - Update TableConfigUtilsTest for new validation rules Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7b06e42 to
e549738
Compare
Summary
Extends Apache Pinot's upsert (primary-key deduplication) to OFFLINE tables, a capability previously limited to REALTIME tables. This enables batch-ingested data to leverage primary-key-based deduplication with the same semantics.
Motivation
Users with batch data pipelines need deduplication on primary keys across segment uploads. Previously this required a REALTIME table with segment push, which adds unnecessary streaming infrastructure overhead. Offline upsert allows a simpler batch-only architecture while preserving upsert guarantees.
Key design decisions
segmentPartitionConfigso all segments of a partition land on the same server. This is validated at table creation time.RealtimeTableDataManagerand the newOfflineTableDataManager. These are now consolidated intoBaseTableDataManager, andTableDataManagerinterface methods replaceinstanceofchecks.Changes
Core — Offline upsert support
OfflineTableDataManagerBaseTableUpsertMetadataManagerUpsertUtilsConstantComparisonColumnReaderfor segment-creation-time-based comparisonUpsertContextBasePartitionUpsertMetadataManagercreateRecordInfoReader()helper with segment creation time fallback; guard TTL paths against empty comparison columns; prefer zkPushTime for offline tables ingetAuthoritativeUpdateOrCreationTime()SegmentMetadataImplzkPushTimefield for offline upsert consistencyRefactoring — Shared upsert logic
BaseTableDataManagerhandleUpsert,replaceUpsertSegment,registerSegment,setZkOperationTimeIfAvailable,getSegmentContexts,isUpsertEnabled,getPartitionToPrimaryKeyCountfromRealtimeTableDataManagerRealtimeTableDataManager_tableUpsertMetadataManagerfield (now in base)TableDataManagerisUpsertEnabled(),getTableUpsertMetadataManager(),getPartitionToPrimaryKeyCount()with default implementationsSingleTableExecutionInfotableDataManager.isUpsertEnabled()instead ofinstanceof RealtimeTableDataManager; add null-safetyPreconditions.checkStatePrimaryKeyCounttableDataManager.getPartitionToPrimaryKeyCount()instead ofinstanceofcastTablesResourcePrimaryKeyCountValidation
TableConfigUtilssegmentPartitionConfigfor offline upsert; scope tenant tag override and COMPLETED instance partition checks to REALTIME only; validate TTL requires comparison column or time columnTableConfigUtilsTestTesting
OfflineUpsertTableIntegrationTestskipUpsertoption, segment replacement with updated recordsConfiguration example
{ "tableName": "myTable_OFFLINE", "tableType": "OFFLINE", "segmentsConfig": { "timeColumnName": "timestampInEpoch", "replicaGroupStrategyConfig": { "partitionColumn": "userId", "numInstancesPerPartition": 1 } }, "tableIndexConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "userId": { "functionName": "Murmur", "numPartitions": 4 } } } }, "upsertConfig": { "mode": "FULL" }, "routing": { "instanceSelectorType": "strictReplicaGroup" } }Test plan
TableConfigUtilsTest— 48 tests pass (updated expected error messages for OFFLINE tables)BasePartitionUpsertMetadataManagerTest— 14 tests passConcurrentMapPartitionUpsertMetadataManagerTest— 24 tests passConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest— 12 tests passOfflineUpsertTableIntegrationTest— new integration test validates end-to-end offline upsert (dedup query results, segment replacement, skipUpsert option)🤖 Generated with Claude Code