Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,18 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema)
}
}

// Partial upsert and dedup tables must have enforceConsumptionInOrder enabled
boolean isPartialUpsert = tableConfig.getUpsertMode() == UpsertConfig.Mode.PARTIAL;
if (isPartialUpsert || isDedupEnabled) {
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
StreamIngestionConfig streamIngestionConfig =
ingestionConfig != null ? ingestionConfig.getStreamIngestionConfig() : null;
Preconditions.checkState(
streamIngestionConfig != null && streamIngestionConfig.isEnforceConsumptionInOrder(),
"enforceConsumptionInOrder must be enabled for %s tables",
isPartialUpsert ? "partial upsert" : "dedup");
}

Preconditions.checkState(
tableConfig.getInstanceAssignmentConfigMap() == null || !tableConfig.getInstanceAssignmentConfigMap()
.containsKey(InstancePartitionsType.COMPLETED.name()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1956,12 +1956,18 @@ public void testValidateDedupConfig() {
StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Lists.newArrayList("myCol"), null,
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), null, 10);
StreamIngestionConfig streamIngestionConfigWithOrder = new StreamIngestionConfig(
List.of(streamConfigs));
streamIngestionConfigWithOrder.setEnforceConsumptionInOrder(true);
IngestionConfig ingestionConfigWithOrder = new IngestionConfig();
ingestionConfigWithOrder.setStreamIngestionConfig(streamIngestionConfigWithOrder);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(new DedupConfig())
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig))
.setStreamConfigs(streamConfigs)
.setIngestionConfig(ingestionConfigWithOrder)
.build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);

Expand Down Expand Up @@ -1989,11 +1995,17 @@ public void testValidateDedupConfig() {
.addDateTime(TIME_COLUMN, FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.setPrimaryKeyColumns(Lists.newArrayList("myCol"))
.build();
StreamIngestionConfig timestampStreamIngestionConfig = new StreamIngestionConfig(
List.of(streamConfigs));
timestampStreamIngestionConfig.setEnforceConsumptionInOrder(true);
IngestionConfig timestampIngestionConfig = new IngestionConfig();
timestampIngestionConfig.setStreamIngestionConfig(timestampStreamIngestionConfig);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(dedupConfig)
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.setIngestionConfig(timestampIngestionConfig)
.build();
// Should not throw an exception - TIMESTAMP is a valid type for dedupTimeColumn
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Expand All @@ -2009,6 +2021,10 @@ public void testValidateInvalidDedupConfigs() {
.build();

Map<String, String> streamConfigs = getStreamConfigs();
StreamIngestionConfig invalidDedupStreamConfig = new StreamIngestionConfig(List.of(streamConfigs));
invalidDedupStreamConfig.setEnforceConsumptionInOrder(true);
IngestionConfig invalidDedupIngestionConfig = new IngestionConfig();
invalidDedupIngestionConfig.setStreamIngestionConfig(invalidDedupStreamConfig);
DedupConfig dedupConfig = new DedupConfig();
dedupConfig.setMetadataTTL(10);
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
Expand All @@ -2017,6 +2033,7 @@ public void testValidateInvalidDedupConfigs() {
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setTimeColumnName(TIME_COLUMN)
.setStreamConfigs(streamConfigs)
.setIngestionConfig(invalidDedupIngestionConfig)
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Expand Down Expand Up @@ -2083,6 +2100,11 @@ public void testValidateDedupConfigWithMd5DisabledAllowsUpsertModeNoneMd5() {
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol"))
.build();
Map<String, String> streamConfigs = getStreamConfigs();
StreamIngestionConfig md5StreamConfig = new StreamIngestionConfig(List.of(streamConfigs));
md5StreamConfig.setEnforceConsumptionInOrder(true);
IngestionConfig md5IngestionConfig = new IngestionConfig();
md5IngestionConfig.setStreamIngestionConfig(md5StreamConfig);
DedupConfig dedupConfig = new DedupConfig();
dedupConfig.setHashFunction(HashFunction.NONE);
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.NONE);
Expand All @@ -2092,7 +2114,8 @@ public void testValidateDedupConfigWithMd5DisabledAllowsUpsertModeNoneMd5() {
.setDedupConfig(dedupConfig)
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(getStreamConfigs())
.setStreamConfigs(streamConfigs)
.setIngestionConfig(md5IngestionConfig)
.build();
try {
PinotMd5Mode.setPinotMd5Disabled(true);
Expand All @@ -2102,6 +2125,92 @@ public void testValidateDedupConfigWithMd5DisabledAllowsUpsertModeNoneMd5() {
}
}

@Test
public void testValidateEnforceConsumptionInOrderForPartialUpsert() {
Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol"))
.build();
Map<String, String> streamConfigs = getStreamConfigs();

// Partial upsert without enforceConsumptionInOrder should fail
UpsertConfig partialUpsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(partialUpsertConfig)
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "enforceConsumptionInOrder must be enabled for partial upsert tables");
}

// Partial upsert with enforceConsumptionInOrder=true should pass
StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(List.of(streamConfigs));
streamIngestionConfig.setEnforceConsumptionInOrder(true);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(partialUpsertConfig)
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.setIngestionConfig(ingestionConfig)
.build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);

// Full upsert without enforceConsumptionInOrder should pass
UpsertConfig fullUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setUpsertConfig(fullUpsertConfig)
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
}

@Test
public void testValidateEnforceConsumptionInOrderForDedup() {
Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.setPrimaryKeyColumns(Lists.newArrayList("myCol"))
.build();
Map<String, String> streamConfigs = getStreamConfigs();

// Dedup without enforceConsumptionInOrder should fail
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(new DedupConfig())
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
fail();
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "enforceConsumptionInOrder must be enabled for dedup tables");
}

// Dedup with enforceConsumptionInOrder=true should pass
StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(List.of(streamConfigs));
streamIngestionConfig.setEnforceConsumptionInOrder(true);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setDedupConfig(new DedupConfig())
.setRoutingConfig(
new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs)
.setIngestionConfig(ingestionConfig)
.build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
}

@Test
public void testValidateUpsertConfig() {
Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
Expand Down
Loading