diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index a9ede32a5912..0a9be7616bb2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -28,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -127,6 +129,8 @@ public void setUp() throws Exception { "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); + HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); + TestTxnDbUtil.setConfValues(hiveConf); hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024); hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 428aedcd8f45..c8166d8ee7d7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -56,6 +56,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.dropTables; @@ -106,6 +107,7 @@ protected void setupWithConf(HiveConfForTest hiveConf) throws Exception { MetastoreConf.setTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); + HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); TestTxnDbUtil.setConfValues(hiveConf); TestTxnDbUtil.cleanDb(hiveConf); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java index b606515391c0..49b231f086e2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java @@ -46,6 +46,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; + class TestCompactorBase { private static final Logger LOG = LoggerFactory.getLogger(TestCompactorBase.class); @@ -92,6 +94,7 @@ public void setup() throws Exception { MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true); + HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); TestTxnDbUtil.setConfValues(hiveConf); TestTxnDbUtil.cleanDb(hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java index 2a593e4927f7..5d9a4b14fbe0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -73,7 +74,8 @@ public CompactionCleaner(HiveConf conf, TxnStore txnHandler, @Override public List getTasks(HiveConf conf) throws MetaException { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); - long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED) + long retentionTime = (HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED) + || TxnHandler.ConfVars.useMinHistoryWriteId()) ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) : 0; List readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index 3538b07c1696..d1c7e3972d03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -198,6 +198,10 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { txnWriteIds.addTableValidWriteIdList(tblValidWriteIds); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString()); + // Register in MIN_HISTORY_WRITE_ID so the per-table cleaner admission blocks while open. + msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(), + Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName))); + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index d82e1cad9a02..ba093d0ac963 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; @@ -152,6 +153,7 @@ void setUpInternal() throws Exception { hiveConf.setBoolean("mapred.input.dir.recursive", true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); + HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); TestTxnDbUtil.setConfValues(hiveConf); TestTxnDbUtil.prepDb(hiveConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 628e51f2a387..8f151b523cb6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -38,6 +38,9 @@ import org.junit.BeforeClass; import java.io.File; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; /** * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries. @@ -59,6 +62,8 @@ public DbTxnManagerEndToEndTestBase() { MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, getWarehouseDir()); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); + HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); + TestTxnDbUtil.setConfValues(conf); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 322045b0dadd..d97f457c11c9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -62,6 +63,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; @@ -113,6 +115,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; + /** * Super class for all of the compactor test modules. */ @@ -143,8 +147,12 @@ protected final void setup(HiveConf conf) throws Exception { MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true); MetastoreConf.setBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId()); + TxnHandler.ConfVars.setUseMinHistoryWriteId(useMinHistoryWriteId()); MetastoreConf.setVar(conf, ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS, "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer"); + if (useMinHistoryWriteId()) { + HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, TimeUnit.SECONDS); + } // Set this config to true in the base class, there are extended test classes which set this config to false. MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true); TestTxnDbUtil.setConfValues(conf); @@ -173,6 +181,12 @@ protected void startCleaner() throws Exception { runOneLoopOfCompactorThread(CompactorThreadType.CLEANER); } + void startCleaner(long retentionTime, TimeUnit timeUnit) throws Exception { + HiveConf.setTimeVar(conf, + HIVE_COMPACTOR_CLEANER_RETENTION_TIME, retentionTime, timeUnit); + startCleaner(); + } + protected void runAcidMetricService() { TestTxnDbUtil.setConfValues(conf); AcidMetricService t = new AcidMetricService(); @@ -388,7 +402,7 @@ protected void burnThroughTransactions(String dbName, String tblName, int num, S txnHandler.commitTxn(new CommitTxnRequest(tid)); } else if (open.contains(tid) && useMinHistoryWriteId()){ txnHandler.addWriteIdsToMinHistory(tid, - Collections.singletonMap(dbName + "." + tblName, minOpenWriteId)); + Map.of(dbName + "." + tblName, minOpenWriteId)); } } } @@ -754,14 +768,13 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : rqst.getRunas(); long compactorTxnId = openTxn(TxnType.COMPACTION); + String fullTableName = ci.getFullTableName().toLowerCase(); // Need to create a valid writeIdList to set the highestWriteId in ci ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( - txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)), compactorTxnId); + txnHandler.getOpenTxns(List.of(TxnType.READ_ONLY)), compactorTxnId); - GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest( - Collections.singletonList( - ci.getFullTableName().toLowerCase())); + GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(List.of(fullTableName)); writeIdsRequest.setValidTxnList(validTxnList.writeToString()); // with this ValidWriteIdList is capped at whatever HWM validTxnList has @@ -769,6 +782,14 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds() .getFirst()); + if (useMinHistoryWriteId()) { + ValidTxnWriteIdList txnWriteIds = new ValidTxnWriteIdList(compactorTxnId); + txnWriteIds.addTableValidWriteIdList(tblValidWriteIds); + + txnHandler.addWriteIdsToMinHistory(compactorTxnId, + Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName))); + } + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); txnHandler.updateCompactorState(ci, compactorTxnId); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java index 8b1216335acd..6399bb5adbe7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -32,9 +33,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hive.metastore.txn.TxnStore.FAILED_RESPONSE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.SUCCEEDED_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE; @@ -59,7 +63,7 @@ protected boolean useMinHistoryWriteId() { @Test public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception { - Table t = prepareTestTable(); + Table t = prepareTestTable("camtc"); CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); long compactTxn = compactInTxn(rqst, CommitAction.ABORT); addBaseFile(t, null, 25L, 25, compactTxn); @@ -83,10 +87,10 @@ public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception { @Test public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception { - Table t = prepareTestTable(); + Table t = prepareTestTable("camtc"); CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); - long compactTxn = compactInTxn(rqst, CommitAction.NONE); - addBaseFile(t, null, 25L, 25, compactTxn); + long compactTxn1 = compactInTxn(rqst, CommitAction.NONE); + addBaseFile(t, null, 25L, 25, compactTxn1); txnHandler.revokeTimedoutWorkers(1L); // an open txn should prevent the retry @@ -96,20 +100,32 @@ public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception { // force retry revokeTimedoutWorkers(conf); - compactTxn = compactInTxn(rqst); - addBaseFile(t, null, 25L, 25, compactTxn); + long compactTxn2 = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn2); startCleaner(); - // Validate that the cleanup attempt has failed. + // Validate that the cleanup attempt was skipped. rsp = txnHandler.showCompact(new ShowCompactRequest()); assertEquals(1, rsp.getCompactsSize()); - assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState()); - assertEquals("txnid:26 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage()); + assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed List paths = getDirectories(conf, t, null); assertEquals(6, paths.size()); + + // Abort the open compaction txn, so that the Cleaner can proceed. + txnHandler.abortTxns( + new AbortTxnsRequest(Collections.singletonList(compactTxn1))); + startCleaner(); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); + + // Check that the files are removed + paths = getDirectories(conf, t, null); + assertEquals(1, paths.size()); } private static void revokeTimedoutWorkers(Configuration conf) throws Exception { @@ -121,39 +137,88 @@ private static void revokeTimedoutWorkers(Configuration conf) throws Exception { } @Test - public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() throws Exception { - Table t = prepareTestTable(); + public void cleanupAndDanglingOpenTxnOnSameTable() throws Exception { + Table t = prepareTestTable("camtc"); CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED); addBaseFile(t, null, 25L, 25, compactTxn); - // Open a query during compaction, + // Open a readerTxn during compaction, // Do not register minOpenWriteId (i.e. simulate delay locking the snapshot) - openTxn(); + long readerTxn = openTxn(); txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); - startCleaner(); + Thread.sleep(MetastoreConf.getTimeVar( + conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); - // Validate that the cleanup attempt has failed. + startCleaner(10, TimeUnit.SECONDS); + + // Validate that the cleanup attempt was delayed by retention time. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); assertEquals(1, rsp.getCompactsSize()); - assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState()); - assertEquals("txnid:27 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage()); + assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed List paths = getDirectories(conf, t, null); assertEquals(5, paths.size()); + + // Register minOpenWriteId for the readerTxn. + txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc", 1L)); + + startCleaner(0, TimeUnit.SECONDS); + + // Validate that the cleanup attempt was blocked by readerTxn. + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); + + // Check that the files are not removed + paths = getDirectories(conf, t, null); + assertEquals(5, paths.size()); + } + + @Test + public void cleanupNotBlockedByOpenTxnOnAnotherTable() throws Exception { + Table t1 = prepareTestTable("camtc1"); + Table t2 = prepareTestTable("camtc2"); + + // Open a readerTxn on t1 and register minOpenWriteId. + long readerTxn = openTxn(); + txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc1", 1L)); + + CompactionRequest rqstTbl1 = new CompactionRequest("default", "camtc1", CompactionType.MAJOR); + long compactTxn = compactInTxn(rqstTbl1); + addBaseFile(t1, null, 25L, 25, compactTxn); + + CompactionRequest rqstTbl2 = new CompactionRequest("default", "camtc2", CompactionType.MAJOR); + compactTxn = compactInTxn(rqstTbl2); + addBaseFile(t2, null, 25L, 25, compactTxn); + + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(2, rsp.getCompactsSize()); + + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals("camtc2", rsp.getCompacts().get(0).getTablename()); + // camtc2 was cleaned: only the new base remains. + assertEquals(1, getDirectories(conf, t2, null).size()); + + assertEquals(CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals("camtc1", rsp.getCompacts().get(1).getTablename()); + // camtc1 wasn't actually cleaned (admission filter held it back). + assertEquals(5, getDirectories(conf, t1, null).size()); } - private Table prepareTestTable() throws Exception { - Table t = newTable("default", "camtc", false); + private Table prepareTestTable(String tblName) throws Exception { + Table t = newTable("default", tblName, false); addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); addDeltaFile(t, null, 25L, 25, 2); - burnThroughTransactions("default", "camtc", 25); + burnThroughTransactions("default", tblName, 25); return t; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index aa613aa8192d..5c5ca7c7bfad 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -79,33 +79,28 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns long highWatermark = minOpenTxn - 1; long[] exceptions = new long[txns.getOpen_txnsSize()]; BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); - int i = 0; + int i = 0, j = 0; for (long txnId : txns.getOpen_txns()) { if (txnId > highWatermark) { break; } - if (abortedBits.get(i)) { - exceptions[i] = txnId; + if (abortedBits.get(i) || isAbortCleanup) { + exceptions[j++] = txnId; + } else if (!TxnHandler.ConfVars.useMinHistoryWriteId()) { + throw new IllegalStateException( + JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark); } else { - if (isAbortCleanup) { - exceptions[i] = txnId; - } else { - throw new IllegalStateException( - JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark); - } + LOG.debug("Ignoring open txn {} <= hwm: {}", txnId, highWatermark); } ++i; } - exceptions = Arrays.copyOf(exceptions, i); + exceptions = Arrays.copyOf(exceptions, j); + + BitSet bitSet = isAbortCleanup ? abortedBits : new BitSet(j); if (!isAbortCleanup) { - BitSet bitSet = new BitSet(exceptions.length); - bitSet.set(0, exceptions.length); - //add ValidCleanerTxnList? - could be problematic for all the places that read it from - // string as they'd have to know which object to instantiate - return new ValidReadTxnList(exceptions, bitSet, highWatermark, Long.MAX_VALUE); - } else { - return new ValidReadTxnList(exceptions, abortedBits, highWatermark, Long.MAX_VALUE); + bitSet.set(0, j); } + return new ValidReadTxnList(exceptions, bitSet, highWatermark, Long.MAX_VALUE); } /**