Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.CompiledTTLExpression;
import org.apache.phoenix.schema.ConditionalTTLExpression;
Expand Down Expand Up @@ -199,6 +200,17 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store,
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
? compactionTime
: compactionTime - (this.maxLookbackInMillis + 1);
Configuration conf = env.getConfiguration();
boolean replayEnabled =
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
boolean guardEnabled =
conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED);
if (replayEnabled && guardEnabled) {
this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard(
this.maxLookbackWindowStart, conf, tableName, columnFamilyName);
}
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
this.major = major && !forceMinorCompaction;
this.minVersion = cfd.getMinVersions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -50,6 +51,17 @@ public class ReplicationLogReplayService {
*/
public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false;

/**
* Configuration key for enabling/disabling the replication compaction guard
*/
public static final String REPLICATION_COMPACTION_GUARD_ENABLED =
"phoenix.replication.compaction.guard.enabled";

/**
* Default value for replication compaction guard enabled flag
*/
public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true;

/**
* Number of threads in the executor pool for the replication replay service
*/
Expand Down Expand Up @@ -83,7 +95,7 @@ public class ReplicationLogReplayService {
private ScheduledExecutorService scheduler;
private volatile boolean isRunning = false;

private ReplicationLogReplayService(final Configuration conf) {
protected ReplicationLogReplayService(final Configuration conf) {
this.conf = conf;
}

Expand All @@ -105,6 +117,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws
return instance;
}

@VisibleForTesting
public static void setInstanceForTesting(ReplicationLogReplayService testInstance) {
instance = testInstance;
}

@VisibleForTesting
public static void resetInstanceForTesting() {
instance = null;
}

/**
* Starts the replication log replay service by initializing the scheduler and scheduling periodic
* replay operations for each HA Group.
Expand Down Expand Up @@ -229,6 +251,37 @@ protected long getConsistencyPoint() throws IOException, SQLException {
return consistencyPoint;
}

/**
* Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On
* standby clusters, this prevents compaction from dropping delete markers that have timestamps
* newer than the consistency point.
*/
public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart,
Configuration conf, String tableName, String columnFamilyName) {
try {
long consistencyPoint = getInstance(conf).getConsistencyPoint();
return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint,
tableName, columnFamilyName);
} catch (Exception e) {
LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}."
+ " Retaining all delete markers.", tableName, columnFamilyName, e);
return 0L;
}
}

@VisibleForTesting
static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart,
long consistencyPoint, String tableName, String columnFamilyName) {
long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint);
if (adjusted < currentMaxLookbackWindowStart) {
LOG.info(
"Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}"
+ " (consistencyPoint={})",
tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint);
}
return adjusted;
}

/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.reader;

import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

/**
* Integration test verifying that the replication compaction guard does NOT interfere with normal
* compaction when explicitly disabled via configuration.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class CompactionReplicationGuardDisabledIT extends BaseTest {

private static final int MAX_LOOKBACK_AGE = 15;
private static final int ROWS_POPULATED = 2;
private ManualEnvironmentEdge injectEdge;

@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true));
props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
Boolean.toString(true));
props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
Boolean.toString(false));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

@Before
public void beforeTest() throws Exception {
EnvironmentEdgeManager.reset();
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
}

@After
public synchronized void afterTest() throws Exception {
ReplicationLogReplayService.resetInstanceForTesting();
EnvironmentEdgeManager.reset();
boolean refCountLeaked = isAnyStoreRefCountLeaked();
assertFalse("refCount leaked", refCountLeaked);
}

/**
* When guard is disabled, delete markers are purged normally by maxLookback even though the
* consistency point would have protected them if the guard were enabled.
*/
@Test(timeout = 120000L)
public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
createTable(dataTableName);
TableName dataTable = TableName.valueOf(dataTableName);
populateTable(dataTableName);

injectEdge.incrementValue(1);
long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();

// Delete a row
conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'");
conn.commit();
injectEdge.incrementValue(1);

// Set consistency point BEFORE delete — guard would retain if enabled
long consistencyPoint = beforeDeleteTime - 1;
ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class);
when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint);
ReplicationLogReplayService.setInstanceForTesting(mockService);

// Advance past maxLookback
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);

flush(dataTable);
majorCompact(dataTable);

// Guard disabled — delete marker purged by maxLookback as normal
assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
}
}

private void flush(TableName table) throws IOException {
getUtility().getAdmin().flush(table);
}

private void majorCompact(TableName table) throws Exception {
TestUtil.majorCompact(getUtility(), table);
}

private void createTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10),"
+ " val2 VARCHAR(10), val3 VARCHAR(10))");
conn.commit();
}
}

private void populateTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')");
conn.commit();
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')");
conn.commit();
}
}
}
Loading