Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
Expand Down Expand Up @@ -97,6 +98,17 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
LOG.info("Starting cleaning for {}, based on min open {}", ci,
(ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn);

if (ci.nextTxnId == 0 && ci.txnId > 0) {
TxnStatus status = txnHandler.getTransactionStatus(ci.txnId);
if (TxnStatus.ABORTED == status) {
LOG.warn("The compaction {} is in invalid state. The compaction is marked as 'ready for cleaning', " +
"but its txn is in aborted state. Marking this compaction as failed.");
ci.errorMessage = "Invalid state: the compaction txn (" + ci.txnId + ") is already aborted.";
txnHandler.markFailed(ci);
return;
}
}

PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.service;

import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -123,7 +124,7 @@
}
}

public Boolean compact(Table table, CompactionInfo ci) throws Exception {

Check warning on line 127 in ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method compact length is 153 lines (max allowed is 150).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ47j5mE6zqM_Hxst0uZ&open=AZ47j5mE6zqM_Hxst0uZ&pullRequest=6498

try (CompactionTxn compactionTxn = new CompactionTxn()) {

Expand Down Expand Up @@ -198,6 +199,9 @@
txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());

msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(),
ImmutableMap.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));

ci.highestWriteId = tblValidWriteIds.getHighWatermark();
//this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
//it until after any data written by it are physically removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -26,6 +27,7 @@
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand Down Expand Up @@ -769,6 +771,12 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce
txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds()
.getFirst());

final ValidTxnWriteIdList txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);

txnHandler.addWriteIdsToMinHistory(compactorTxnId,
ImmutableMap.of(ci.getFullTableName(), txnWriteIds.getMinOpenWriteId(ci.getFullTableName())));

ci.highestWriteId = tblValidWriteIds.getHighWatermark();
txnHandler.updateCompactorState(ci, compactorTxnId);

Expand Down
147 changes: 144 additions & 3 deletions ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
Expand All @@ -36,6 +38,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
Expand Down Expand Up @@ -63,6 +66,10 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_STATE;
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -1347,7 +1354,121 @@
}
}

private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) throws Exception {
@Test
public void testCleanerRunsWithOpenCompactionTxn() throws Exception {

Check warning on line 1358 in ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ47j5pd6zqM_Hxst0ua&open=AZ47j5pd6zqM_Hxst0ua&pullRequest=6498
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0L);

String dbName = "default";
String tblName = "campcnb";
Table t = newTable(dbName, tblName, false);
addDeltaFile(t, null, 1L, 1L, 1);
addDeltaFile(t, null, 2L, 2L, 1);
addDeltaFile(t, null, 3L, 3L, 1);
addDeltaFile(t, null, 4L, 4L, 1);
burnThroughTransactions(dbName, tblName, 4, null, null);

// trigger compaction
CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
long txnId = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
addBaseFile(t, null, 4L, 6, txnId);
String deltaName1 = "base_4_v0000005";

// should not clean anything since the compaction txn is still open
startCleaner();

ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());

String deltaName2 = createDeltasAndRunMajorCompaction(t, 5, 2);

// should not clean anything since the compaction txn is still open
startCleaner();

rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(2, rsp.getCompactsSize());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState());

String deltaName3 = createDeltasAndRunMajorCompaction(t, 7, 2);

//Abort the compaction txn
txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId)));
Thread.sleep(10000L);

Check warning on line 1397 in ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "Thread.sleep()".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ47j5pd6zqM_Hxst0uc&open=AZ47j5pd6zqM_Hxst0uc&pullRequest=6498

Set<String> expectedDirs = new HashSet<>();
expectedDirs.add(deltaName1);
expectedDirs.add(deltaName2);
expectedDirs.add(deltaName3);
for (int i = 1; i < 9; i++) {
expectedDirs.add(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);
// Should mark the compaction 1 failed as its txn is aborted
startCleaner();
verifyDirectories(t, expectedDirs);

// Should find the second compaction
startCleaner();
expectedDirs.remove(deltaName1);
for (int i = 1; i < 7; i++) {
expectedDirs.remove(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);
// Should find the third compaction and deletes the directories accordingly
startCleaner();
expectedDirs.remove(deltaName2);
for (int i = 7; i < 9; i++) {
expectedDirs.remove(makeDeltaDirName(i, i));
}
verifyDirectories(t, expectedDirs);

rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(3, rsp.getCompactsSize());
assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState());
assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().get(2).getState());
assertEquals("Invalid state: the compaction txn (" + txnId + ") is already aborted.",
rsp.getCompacts().get(2).getErrorMessage());
}

@Test
public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {

Check warning on line 1436 in ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ47j5pd6zqM_Hxst0ub&open=AZ47j5pd6zqM_Hxst0ub&pullRequest=6498
Table t = prepareTestTable();
CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
long openCompactTxn = compactInTxn(rqst, CommitAction.NONE);
addBaseFile(t, null, 25L, 25, openCompactTxn);

txnHandler.revokeTimedoutWorkers(1L);
// an open txn should prevent the retry
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(WORKING_RESPONSE, rsp.getCompacts().getFirst().getState());

// force retry
revokeTimedoutWorkers(conf);
long compactTxn = compactInTxn(rqst);
addBaseFile(t, null, 25L, 25, compactTxn);

startCleaner();

// Validate that the cleanup attempt has failed.
rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, null);
assertEquals(6, paths.size());

txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(openCompactTxn)));
startCleaner();
// Check that the files are not removed
paths = getDirectories(conf, t, null);
assertEquals(1, paths.size());
}

protected String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas)
throws Exception {
String dbName = table.getDbName();
String tableName = table.getTableName();
for (int i = 0; i < numberOfDeltas; i++) {
Expand All @@ -1361,10 +1482,30 @@
return AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxnId, compactTxn);
}

private void verifyDirectories(Table table, Set<String> expectedDirs) throws Exception {
protected void verifyDirectories(Table table, Set<String> expectedDirs) throws Exception {
List<Path> paths = getDirectories(conf, table, null);
Set<String> actualDirs = paths.stream().map(Path::getName).collect(Collectors.toSet());
Assert.assertEquals(expectedDirs, actualDirs);
}

}
private static void revokeTimedoutWorkers(Configuration conf) throws Exception {
TestTxnDbUtil.executeUpdate(conf, """
UPDATE "COMPACTION_QUEUE"
SET "CQ_WORKER_ID" = NULL, "CQ_START" = NULL, "CQ_STATE" = '%c'
WHERE "CQ_STATE" = '%c'
""".formatted(INITIATED_STATE, WORKING_STATE));
}

protected Table prepareTestTable() throws Exception {
Table t = newTable("default", "camtc", false);

addBaseFile(t, null, 20L, 20);
addDeltaFile(t, null, 21L, 22L, 2);
addDeltaFile(t, null, 23L, 24L, 2);
addDeltaFile(t, null, 25L, 25, 2);

burnThroughTransactions("default", "camtc", 25);
return t;
}

}
Loading
Loading