From 6f2cd84ae856f77ea67a9c205a04864371d08f36 Mon Sep 17 00:00:00 2001 From: Sonam Mandal Date: Tue, 25 Feb 2025 13:49:43 -0800 Subject: [PATCH 1/3] Update the segment operation throttle defaults to Integer.MAX_VALUE --- .../utils/SegmentOperationsThrottlerTest.java | 72 ++++++++++--------- .../pinot/spi/utils/CommonConstants.java | 14 ++-- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java index b560dd66028b..8662def34fd5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java @@ -308,11 +308,13 @@ public void testServingQueriesDisabledWithAcquireRelease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -322,12 +324,11 @@ public void testServingQueriesDisabledWithAcquireRelease() operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -340,11 +341,11 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() int initialPermits = 4; List segmentOperationsThrottlerList = new ArrayList<>(); segmentOperationsThrottlerList.add(new SegmentAllIndexPreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentStarTreePreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentDownloadThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); for (BaseSegmentOperationsThrottler operationsThrottler : segmentOperationsThrottlerList) { int defaultPermitsBeforeQuery = operationsThrottler instanceof SegmentAllIndexPreprocessThrottler @@ -353,14 +354,16 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - 5); + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1 - 5); } // Double the permits for before serving queries config @@ -370,29 +373,29 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery * 2)); + String.valueOf(defaultPermitsBeforeQuery)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + // We increased permits but took some before the increase + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - numPermitsToTake); - // Take remaining permits - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + // Take more permits + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), + defaultPermitsBeforeQuery - numPermitsToTake - i - 1); } // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - (defaultPermitsBeforeQuery * 2)); + initialPermits - (numPermitsToTake * 2)); - for (int i = 0; i < defaultPermitsBeforeQuery * 2; i++) { + for (int i = 0; i < numPermitsToTake * 2; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery * 2) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake * 2) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -418,11 +421,13 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -430,27 +435,26 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease() // Half the permits for before serving queries config Map updatedClusterConfigs = new HashMap<>(); + int newDefaultPermits = defaultPermitsBeforeQuery / 2; updatedClusterConfigs.put(operationsThrottler instanceof SegmentAllIndexPreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery / 2)); + String.valueOf(newDefaultPermits)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery / 2); + Assert.assertEquals(operationsThrottler.totalPermits(), newDefaultPermits); // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), -(defaultPermitsBeforeQuery / 2)); + Assert.assertEquals(operationsThrottler.availablePermits(), newDefaultPermits - numPermitsToTake); // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); - Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3933187f7f65..e01d4badd63a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -252,26 +252,28 @@ public static class Instance { // Preprocess throttle configs public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM = "pinot.server.max.segment.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(100); + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); // Before serving queries is enabled, we should use a higher preprocess parallelism to process segments faster public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.preprocess.parallelism.before.serving.queries"; // Use the below default before enabling queries on the server - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(100); + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); // Preprocess throttle config specifically for StarTree index rebuild public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = "pinot.server.max.segment.startree.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(100); + public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.startree.preprocess.parallelism.before.serving.queries"; public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = - String.valueOf(100); + String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "pinot.server.max.segment.download.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "100"; + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.download.parallelism.before.serving.queries"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "100"; + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); } public static class Broker { From 3872b718f7e9a2ed21905c2bc2e75f03502e1f7c Mon Sep 17 00:00:00 2001 From: Sonam Mandal Date: Tue, 25 Feb 2025 14:24:15 -0800 Subject: [PATCH 2/3] Trigger Build From 545bb7c40d63fc589fe19f190727f980526ad9a8 Mon Sep 17 00:00:00 2001 From: Sonam Mandal Date: Tue, 25 Feb 2025 15:15:56 -0800 Subject: [PATCH 3/3] Address review comments --- .../local/utils/SegmentOperationsThrottlerTest.java | 9 ++++++--- .../java/org/apache/pinot/spi/utils/CommonConstants.java | 7 ++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java index 8662def34fd5..1ceff184af0c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java @@ -308,7 +308,8 @@ public void testServingQueriesDisabledWithAcquireRelease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); - // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance @@ -354,7 +355,8 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); - // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance @@ -421,7 +423,8 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); - // Default is too high: Integer.MAX_VALUE, take a large number of permits to ensure we don't block + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index e01d4badd63a..9ba8b4f85fdd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -252,26 +252,31 @@ public static class Instance { // Preprocess throttle configs public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM = "pinot.server.max.segment.preprocess.parallelism"; + // Setting to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); // Before serving queries is enabled, we should use a higher preprocess parallelism to process segments faster public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.preprocess.parallelism.before.serving.queries"; - // Use the below default before enabling queries on the server + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(Integer.MAX_VALUE); // Preprocess throttle config specifically for StarTree index rebuild public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = "pinot.server.max.segment.startree.preprocess.parallelism"; + // Setting to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.startree.preprocess.parallelism.before.serving.queries"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "pinot.server.max.segment.download.parallelism"; + // Setting to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.download.parallelism.before.serving.queries"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(Integer.MAX_VALUE); }