From 80a356b6817e600c6ebd4ca4290578b0aa92b5c9 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 19:19:57 +0530 Subject: [PATCH 1/6] PHOENIX-7863 Add replication consistency point guard to compaction On standby clusters, compaction can prematurely drop delete markers newer than the replication consistency point, causing permanent stale data. This adds a guard that floors maxLookbackWindowStart to the minimum consistency point across all HA groups when replication replay is enabled. Enabled via phoenix.replication.compaction.guard.enabled (default true), active only when phoenix.replication.replay.enabled is also true. Falls back to retaining all delete markers if the consistency point is unavailable. --- .../apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 2 + .../coprocessor/CompactionScanner.java | 50 ++++ .../reader/ReplicationLogReplayService.java | 15 +- .../CompactionReplicationGuardDisabledIT.java | 151 ++++++++++ .../reader/CompactionReplicationGuardIT.java | 263 ++++++++++++++++++ ...CompactionScannerReplicationGuardTest.java | 99 +++++++ 7 files changed, 580 insertions(+), 2 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 72075853aa8..1609d891582 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,6 +644,8 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; + String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; + // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 339b6763a45..997f91baafb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,6 +514,8 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f12dc77f724..6945599e604 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.CompiledTTLExpression; import org.apache.phoenix.schema.ConditionalTTLExpression; @@ -199,6 +200,16 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); + Configuration conf = env.getConfiguration(); + boolean replayEnabled = + conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); + boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, + QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + if (replayEnabled && guardEnabled) { + this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); + } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); @@ -359,6 +370,45 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getConsistencyPointFromReplayService(conf); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOGGER + .warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { + ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); + return service.getConsistencyPoint(); + } + + @VisibleForTesting + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOGGER.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 24d40faac77..3c3c92de68f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -83,7 +84,7 @@ public class ReplicationLogReplayService { private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; - private ReplicationLogReplayService(final Configuration conf) { + protected ReplicationLogReplayService(final Configuration conf) { this.conf = conf; } @@ -105,6 +106,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws return instance; } + @VisibleForTesting + public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { + instance = testInstance; + } + + @VisibleForTesting + public static void resetInstanceForTesting() { + instance = null; + } + /** * Starts the replication log replay service by initializing the scheduler and scheduling periodic * replay operations for each HA Group. @@ -219,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - protected long getConsistencyPoint() throws IOException, SQLException { + public long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java new file mode 100644 index 00000000000..40adc1126af --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration test verifying that the replication compaction guard does NOT interfere with normal + * compaction when explicitly disabled via configuration. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardDisabledIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * When guard is disabled, delete markers are purged normally by maxLookback even though the + * consistency point would have protected them if the guard were enabled. + */ + @Test(timeout = 120000L) + public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE delete — guard would retain if enabled + long consistencyPoint = beforeDeleteTime - 1; + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Guard disabled — delete marker purged by maxLookback as normal + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java new file mode 100644 index 00000000000..bfcec12b646 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration tests for the replication consistency point compaction guard. Verifies that + * CompactionScanner retains delete markers with timestamps newer than the consistency point on + * clusters where replication replay is enabled. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(4); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * Test 1: Guard retains delete markers that maxLookback would have purged. The consistency point + * is set BEFORE the delete timestamp, so the delete marker is newer than the consistency point + * and must be retained even after maxLookback window passes. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — meaning replay hasn't caught up to the delete + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance time past maxLookback window — without guard, marker would be purged + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be retained because its timestamp > consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 2: Both maxLookback and guard allow purge. The consistency point has advanced past the + * delete marker AND maxLookback window has passed — marker should be purged. + */ + @Test(timeout = 120000L) + public void testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback() + throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Advance time past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + // Set consistency point to current time — replay is fully caught up + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be purged — both guard and maxLookback agree + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + /** + * Test 3: MaxLookback retains even when guard wouldn't. Consistency point has advanced past the + * delete, but we're still within the maxLookback window — marker retained by maxLookback. + */ + @Test(timeout = 120000L) + public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point to current time — guard would allow purge + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + // Do NOT advance past maxLookback — still within the window + injectEdge.incrementValue(1); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained because still within maxLookback window + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 4: Guard fallback when consistency point unavailable — retains all delete markers. When + * the replay service throws an exception (e.g., not initialized), the guard falls back to + * retaining all markers to avoid data loss. + */ + @Test(timeout = 120000L) + public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Inject a mock that throws — simulating uninitialized replay service + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()) + .thenThrow(new IOException("HA groups not initialized")); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Fallback retains all — delete marker NOT purged despite maxLookback passing + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java new file mode 100644 index 00000000000..6fbc43bea2a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment + * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * maxLookbackWindowStart is floored to the consistency point. + */ +public class CompactionScannerReplicationGuardTest { + + private static final String TABLE_NAME = "TEST_TABLE"; + private static final String CF_NAME = "0"; + + @Test + public void testAdjustsWindowWhenConsistencyPointIsLower() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } + + @Test + public void testNoChangeWhenConsistencyPointIsHigher() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 1000000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testNoChangeWhenConsistencyPointEqualsWindowStart() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 500000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointAtZeroRetainsAll() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 0L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(0L, result); + } + + @Test + public void testLargeTimestampsNoAdjustmentNeeded() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 120000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointFarInPastPushesWindowBack() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 604800000L; + + long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } +} From 601debd047e3cc1dda28ded4df22ec2c0b6b18f5 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:07:41 +0530 Subject: [PATCH 2/6] PHOENIX-7863 Move compaction guard logic to ReplicationLogReplayService Move applyReplicationConsistencyGuard and adjustMaxLookbackWindowStart from CompactionScanner to ReplicationLogReplayService where the consistency point logic belongs. Relocate unit test to replication.reader package accordingly. --- .../coprocessor/CompactionScanner.java | 43 ++----------------- .../reader/ReplicationLogReplayService.java | 32 ++++++++++++++ .../ReplicationCompactionGuardTest.java} | 32 +++++++------- 3 files changed, 51 insertions(+), 56 deletions(-) rename phoenix-core/src/test/java/org/apache/phoenix/{coprocessor/CompactionScannerReplicationGuardTest.java => replication/reader/ReplicationCompactionGuardTest.java} (67%) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 6945599e604..7f41dad9e7f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -207,8 +207,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = + ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, + conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -370,44 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - /** - * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On - * standby clusters, this prevents compaction from dropping delete markers that have timestamps - * newer than the consistency point. - */ - @VisibleForTesting - static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, - Configuration conf, String tableName, String columnFamilyName) { - try { - long consistencyPoint = getConsistencyPointFromReplayService(conf); - return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, - tableName, columnFamilyName); - } catch (Exception e) { - LOGGER - .warn("Replication guard enabled but consistency point unavailable for table={} store={}." - + " Retaining all delete markers.", tableName, columnFamilyName, e); - return 0L; - } - } - - @VisibleForTesting - static long getConsistencyPointFromReplayService(Configuration conf) throws Exception { - ReplicationLogReplayService service = ReplicationLogReplayService.getInstance(conf); - return service.getConsistencyPoint(); - } - - @VisibleForTesting - static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, - long consistencyPoint, String tableName, String columnFamilyName) { - long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); - if (adjusted < currentMaxLookbackWindowStart) { - LOGGER.info( - "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" - + " (consistencyPoint={})", - tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); - } - return adjusted; - } static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 3c3c92de68f..37e84579dba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -240,6 +240,38 @@ public long getConsistencyPoint() throws IOException, SQLException { return consistencyPoint; } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + @VisibleForTesting + public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getInstance(conf).getConsistencyPoint(); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOG.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java similarity index 67% rename from phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java rename to phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 6fbc43bea2a..3248106134a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/coprocessor/CompactionScannerReplicationGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.phoenix.coprocessor; +package org.apache.phoenix.replication.reader; import static org.junit.Assert.assertEquals; import org.junit.Test; /** - * Tests for the replication consistency point guard in CompactionScanner. Tests the pure adjustment - * logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure + * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring * maxLookbackWindowStart is floored to the consistency point. */ -public class CompactionScannerReplicationGuardTest { +public class ReplicationCompactionGuardTest { private static final String TABLE_NAME = "TEST_TABLE"; private static final String CF_NAME = "0"; @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = CompactionScanner.adjustMaxLookbackWindowStart(maxLookbackWindowStart, - consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( + maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From 2b542781c6466ca2ad19410ce15b037f21e0e74a Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:12:52 +0530 Subject: [PATCH 3/6] PHOENIX-7863 Revert getConsistencyPoint visibility to protected Now that the guard logic lives in the same class, public access is no longer needed. Tests are in the same package and can still access it. --- .../phoenix/replication/reader/ReplicationLogReplayService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 37e84579dba..e5a53a74c64 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -230,7 +230,7 @@ protected void stopReplicationReplay() throws IOException, SQLException { * @throws IOException if there's an error retrieving consistency points from replication groups * @throws SQLException if there's an error accessing HA group information */ - public long getConsistencyPoint() throws IOException, SQLException { + protected long getConsistencyPoint() throws IOException, SQLException { long consistencyPoint = EnvironmentEdgeManager.currentTime(); List replicationGroups = getReplicationGroups(); for (String replicationGroup : replicationGroups) { From 6404215f612bca179fe59e43d60a89fb597949e7 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:23:31 +0530 Subject: [PATCH 4/6] PHOENIX-7863 Move guard config constants to ReplicationLogReplayService Co-locate REPLICATION_COMPACTION_GUARD_ENABLED and its default with the other replication config constants in ReplicationLogReplayService, removing them from QueryServices/QueryServicesOptions. --- .../java/org/apache/phoenix/query/QueryServices.java | 2 -- .../apache/phoenix/query/QueryServicesOptions.java | 2 -- .../apache/phoenix/coprocessor/CompactionScanner.java | 5 +++-- .../reader/ReplicationLogReplayService.java | 11 +++++++++++ .../reader/CompactionReplicationGuardDisabledIT.java | 3 ++- .../reader/CompactionReplicationGuardIT.java | 3 ++- 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 1609d891582..72075853aa8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -644,8 +644,6 @@ public interface QueryServices extends SQLCloseable { String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; - String REPLICATION_COMPACTION_GUARD_ENABLED = "phoenix.replication.compaction.guard.enabled"; - // HA Group Store sync job interval in seconds String HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = "phoenix.ha.group.store.sync.interval.seconds"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 997f91baafb..339b6763a45 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -514,8 +514,6 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; - public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; - // Default HA Group Store sync job interval in seconds (15 minutes = 900 seconds) public static final int DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS = 900; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7f41dad9e7f..7a954d06e1d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,8 +204,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, - QueryServicesOptions.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = conf.getBoolean( + ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index e5a53a74c64..33cd922bcb8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -51,6 +51,17 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; + /** + * Configuration key for enabling/disabling the replication compaction guard + */ + public static final String REPLICATION_COMPACTION_GUARD_ENABLED = + "phoenix.replication.compaction.guard.enabled"; + + /** + * Default value for replication compaction guard enabled flag + */ + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + /** * Number of threads in the executor pool for the replication replay service */ diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 40adc1126af..3755ab73c27 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -63,7 +63,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(false)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(false)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index bfcec12b646..7b53db3c1f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -64,7 +64,8 @@ public static synchronized void doSetup() throws Exception { props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, Boolean.toString(true)); - props.put(QueryServices.REPLICATION_COMPACTION_GUARD_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(true)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } From 28108c9cfb555124da833ba676b8912c34da03f3 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Wed, 27 May 2026 20:53:53 +0530 Subject: [PATCH 5/6] PHOENIX-7863 Apply spotless formatting --- .../coprocessor/CompactionScanner.java | 12 ++++------ .../CompactionReplicationGuardDisabledIT.java | 1 - .../reader/CompactionReplicationGuardIT.java | 1 - .../ReplicationCompactionGuardTest.java | 24 +++++++++---------- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 7a954d06e1d..33fb6bf0cc8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -204,13 +204,12 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, boolean replayEnabled = conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); - boolean guardEnabled = conf.getBoolean( - ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, - ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + boolean guardEnabled = + conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); if (replayEnabled && guardEnabled) { - this.maxLookbackWindowStart = - ReplicationLogReplayService.applyReplicationConsistencyGuard(this.maxLookbackWindowStart, - conf, tableName, columnFamilyName); + this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( + this.maxLookbackWindowStart, conf, tableName, columnFamilyName); } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; @@ -372,7 +371,6 @@ public static long getMaxLookbackInMillis(String tableName, String columnFamilyN : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); } - static class CellTimeComparator implements Comparator { public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 3755ab73c27..8ed243e1690 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 7b53db3c1f8..1ad78f0ab56 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.ReadOnlyProps; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java index 3248106134a..0cffee289b0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -36,8 +36,8 @@ public void testAdjustsWindowWhenConsistencyPointIsLower() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } @@ -47,8 +47,8 @@ public void testNoChangeWhenConsistencyPointIsHigher() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 1000000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -58,8 +58,8 @@ public void testNoChangeWhenConsistencyPointEqualsWindowStart() { long maxLookbackWindowStart = 500000L; long consistencyPoint = 500000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -69,8 +69,8 @@ public void testConsistencyPointAtZeroRetainsAll() { long maxLookbackWindowStart = 1000000L; long consistencyPoint = 0L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(0L, result); } @@ -80,8 +80,8 @@ public void testLargeTimestampsNoAdjustmentNeeded() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 120000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(maxLookbackWindowStart, result); } @@ -91,8 +91,8 @@ public void testConsistencyPointFarInPastPushesWindowBack() { long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; long consistencyPoint = System.currentTimeMillis() - 604800000L; - long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart( - maxLookbackWindowStart, consistencyPoint, TABLE_NAME, CF_NAME); + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); assertEquals(consistencyPoint, result); } From 01f14cabd310ffe35b4d6b9d5f4bedab38f32656 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Fri, 29 May 2026 15:10:39 +0530 Subject: [PATCH 6/6] PHOENIX-7863 Fix visibility and capacity hints from review - Remove incorrect @VisibleForTesting from applyReplicationConsistencyGuard (it is genuinely public, called from CompactionScanner) - Reduce adjustMaxLookbackWindowStart to package-private - Fix newHashMapWithExpectedSize(4) to 5 in both IT classes --- .../replication/reader/ReplicationLogReplayService.java | 3 +-- .../reader/CompactionReplicationGuardDisabledIT.java | 2 +- .../replication/reader/CompactionReplicationGuardIT.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 33cd922bcb8..e413ed73e9b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -256,7 +256,6 @@ protected long getConsistencyPoint() throws IOException, SQLException { * standby clusters, this prevents compaction from dropping delete markers that have timestamps * newer than the consistency point. */ - @VisibleForTesting public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, Configuration conf, String tableName, String columnFamilyName) { try { @@ -271,7 +270,7 @@ public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindo } @VisibleForTesting - public static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, long consistencyPoint, String tableName, String columnFamilyName) { long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); if (adjusted < currentMaxLookbackWindowStart) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java index 8ed243e1690..68ff6ed986f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -57,7 +57,7 @@ public class CompactionReplicationGuardDisabledIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java index 1ad78f0ab56..7788155eec3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -58,7 +58,7 @@ public class CompactionReplicationGuardIT extends BaseTest { @BeforeClass public static synchronized void doSetup() throws Exception { - Map props = Maps.newHashMapWithExpectedSize(4); + Map props = Maps.newHashMapWithExpectedSize(5); props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,