From c84bc0abf4a7318b8d9f6687791c6cf98cbef1cc Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Tue, 12 May 2026 10:45:34 +0200 Subject: [PATCH 1/2] HIVE-29572: ACID Compaction: Cleaner should check the state of the compaction txn before start cleaning --- .../service/AcidCompactionService.java | 4 + .../hive/ql/txn/compactor/CompactorTest.java | 8 + .../hive/ql/txn/compactor/TestCleaner.java | 144 ++++++++++++++++- .../TestCleanerWithMinHistoryWriteId.java | 147 ++++++++++-------- .../hadoop/hive/metastore/txn/TxnUtils.java | 4 +- .../txn/jdbc/functions/AbortTxnsFunction.java | 21 +++ .../txn/jdbc/queries/ReadyToCleanHandler.java | 4 + 7 files changed, 261 insertions(+), 71 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index 3538b07c1696..ff3361f342ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor.service; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -198,6 +199,9 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { txnWriteIds.addTableValidWriteIdList(tblValidWriteIds); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString()); + msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(), + ImmutableMap.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName))); + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 322045b0dadd..593e460fc2c5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -769,6 +771,12 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds() .getFirst()); + final ValidTxnWriteIdList txnWriteIds = new ValidTxnWriteIdList(compactorTxnId); + txnWriteIds.addTableValidWriteIdList(tblValidWriteIds); + + txnHandler.addWriteIdsToMinHistory(compactorTxnId, + ImmutableMap.of(ci.getFullTableName(), txnWriteIds.getMinOpenWriteId(ci.getFullTableName()))); + ci.highestWriteId = tblValidWriteIds.getHighWatermark(); txnHandler.updateCompactorState(ci, compactorTxnId); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index df73adf26ed3..a33ea9eab451 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; @@ -63,6 +66,10 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_STATE; import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1347,7 +1354,118 @@ public void testCompactionHwmIsHonoredWithMinOpenWriteIdSetAndAbortedIOW() throw } } - private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) throws Exception { + @Test + public void testCleanerRunsWithOpenCompactionTxn() throws Exception { + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0L); + + String dbName = "default"; + String tblName = "campcnb"; + Table t = newTable(dbName, tblName, false); + addDeltaFile(t, null, 1L, 1L, 1); + addDeltaFile(t, null, 2L, 2L, 1); + addDeltaFile(t, null, 3L, 3L, 1); + addDeltaFile(t, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tblName, 4, null, null); + + // trigger compaction + CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + long txnId = compactInTxn(rqst, CommitAction.MARK_COMPACTED); + addBaseFile(t, null, 4L, 6, txnId); + String deltaName1 = "base_4_v0000005"; + + // should not clean anything since the compaction txn is still open + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + + String deltaName2 = createDeltasAndRunMajorCompaction(t, 5, 2); + + // should not clean anything since the compaction txn is still open + startCleaner(); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); + + String deltaName3 = createDeltasAndRunMajorCompaction(t, 7, 2); + + //Abort the compaction txn + txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId))); + Thread.sleep(10000L); + + Set expectedDirs = new HashSet<>(); + expectedDirs.add(deltaName1); + expectedDirs.add(deltaName2); + expectedDirs.add(deltaName3); + for (int i = 1; i < 9; i++) { + expectedDirs.add(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + + // Should find the second compaction + startCleaner(); + expectedDirs.remove(deltaName1); + for (int i = 1; i < 7; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + // Should find the third compaction and deletes the directories accordingly + startCleaner(); + expectedDirs.remove(deltaName2); + for (int i = 7; i < 9; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } + verifyDirectories(t, expectedDirs); + + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(3, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().get(2).getState()); + assertEquals("The txn (id=" + txnId + ") of this compaction got aborted.", + rsp.getCompacts().get(2).getErrorMessage()); + } + + @Test + public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception { + Table t = prepareTestTable(); + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + long openCompactTxn = compactInTxn(rqst, CommitAction.NONE); + addBaseFile(t, null, 25L, 25, openCompactTxn); + + txnHandler.revokeTimedoutWorkers(1L); + // an open txn should prevent the retry + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(WORKING_RESPONSE, rsp.getCompacts().getFirst().getState()); + + // force retry + revokeTimedoutWorkers(conf); + long compactTxn = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn); + + startCleaner(); + + // Validate that the cleanup attempt has failed. + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); + // Check that the files are not removed + List paths = getDirectories(conf, t, null); + assertEquals(6, paths.size()); + + txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(openCompactTxn))); + startCleaner(); + // Check that the files are not removed + paths = getDirectories(conf, t, null); + assertEquals(1, paths.size()); + } + + protected String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int numberOfDeltas) + throws Exception { String dbName = table.getDbName(); String tableName = table.getTableName(); for (int i = 0; i < numberOfDeltas; i++) { @@ -1361,10 +1479,30 @@ private String createDeltasAndRunMajorCompaction(Table table, long minTxnId, int return AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxnId, compactTxn); } - private void verifyDirectories(Table table, Set expectedDirs) throws Exception { + protected void verifyDirectories(Table table, Set expectedDirs) throws Exception { List paths = getDirectories(conf, table, null); Set actualDirs = paths.stream().map(Path::getName).collect(Collectors.toSet()); Assert.assertEquals(expectedDirs, actualDirs); } -} \ No newline at end of file + private static void revokeTimedoutWorkers(Configuration conf) throws Exception { + TestTxnDbUtil.executeUpdate(conf, """ + UPDATE "COMPACTION_QUEUE" + SET "CQ_WORKER_ID" = NULL, "CQ_START" = NULL, "CQ_STATE" = '%c' + WHERE "CQ_STATE" = '%c' + """.formatted(INITIATED_STATE, WORKING_STATE)); + } + + protected Table prepareTestTable() throws Exception { + Table t = newTable("default", "camtc", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 25L, 25, 2); + + burnThroughTransactions("default", "camtc", 25); + return t; + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java index 8b1216335acd..fcc6ffe23f72 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -29,11 +30,16 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.FAILED_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.SUCCEEDED_RESPONSE; import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE; @@ -81,80 +87,89 @@ public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception { assertEquals(addVisibilitySuffix("base_25", 27), paths.getFirst().getName()); } - @Test - public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception { - Table t = prepareTestTable(); - CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); - long compactTxn = compactInTxn(rqst, CommitAction.NONE); - addBaseFile(t, null, 25L, 25, compactTxn); - - txnHandler.revokeTimedoutWorkers(1L); - // an open txn should prevent the retry - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertEquals(WORKING_RESPONSE, rsp.getCompacts().getFirst().getState()); - - // force retry - revokeTimedoutWorkers(conf); - compactTxn = compactInTxn(rqst); - addBaseFile(t, null, 25L, 25, compactTxn); - - startCleaner(); - - // Validate that the cleanup attempt has failed. - rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState()); - assertEquals("txnid:26 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage()); - - // Check that the files are not removed - List paths = getDirectories(conf, t, null); - assertEquals(6, paths.size()); - } - - private static void revokeTimedoutWorkers(Configuration conf) throws Exception { - TestTxnDbUtil.executeUpdate(conf, """ - UPDATE "COMPACTION_QUEUE" - SET "CQ_WORKER_ID" = NULL, "CQ_START" = NULL, "CQ_STATE" = '%c' - WHERE "CQ_STATE" = '%c' - """.formatted(INITIATED_STATE, WORKING_STATE)); - } +// @Test +// public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() throws Exception { +// Table t = prepareTestTable(); +// CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); +// long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED); +// addBaseFile(t, null, 25L, 25, compactTxn); +// +// // Open a query during compaction, +// // Do not register minOpenWriteId (i.e. simulate delay locking the snapshot) +// openTxn(); +// +// txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); +// startCleaner(); +// +// // Validate that the cleanup attempt has failed. +// ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); +// assertEquals(1, rsp.getCompactsSize()); +// assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); +// +// // Check that the files are not removed +// List paths = getDirectories(conf, t, null); +// assertEquals(5, paths.size()); +// } @Test - public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() throws Exception { - Table t = prepareTestTable(); - CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); - long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED); - addBaseFile(t, null, 25L, 25, compactTxn); - - // Open a query during compaction, - // Do not register minOpenWriteId (i.e. simulate delay locking the snapshot) - openTxn(); - - txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); + public void testCleanerBlockedByGlobalTxn() throws Exception { + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0L); + + String dbName = "default"; + String tblName1 = "campcnb1"; + String tblName2 = "campcnb2"; + + // Create table 1, insert 4 deltas and run a MAJOR compaction + Table t1 = newTable(dbName, tblName1, false); + addDeltaFile(t1, null, 1L, 1L, 1); + burnThroughTransactions(dbName, tblName1, 1, null, Collections.singleton(1L)); + String deltaName1 = createDeltasAndRunMajorCompaction(t1, 2, 3); + startCleaner(); + Set expectedDirs = new HashSet<>(); + expectedDirs.add(deltaName1); + verifyDirectories(t1, expectedDirs); + + // Create table 2 and insert 4 deltas + Table t2 = newTable(dbName, tblName2, false); + addDeltaFile(t2, null, 1L, 1L, 1); + addDeltaFile(t2, null, 2L, 2L, 1); + addDeltaFile(t2, null, 3L, 3L, 1); + addDeltaFile(t2, null, 4L, 4L, 1); + burnThroughTransactions(dbName, tblName2, 4, null, null); + + // Open a txn for table2 + long longQuery = openTxn(); + TxnStoreHelper.wrap(txnHandler) + .registerMinOpenWriteId("default", tblName2, longQuery); + + // Insert two deltas to table 1 and run a MAJOR compaction + String deltaName2 = createDeltasAndRunMajorCompaction(t1, 5, 2); + // The cleaner should not be blocked by the txn on table 2 and should delete the deltas startCleaner(); - // Validate that the cleanup attempt has failed. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - assertEquals(1, rsp.getCompactsSize()); - assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState()); - assertEquals("txnid:27 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage()); - - // Check that the files are not removed - List paths = getDirectories(conf, t, null); - assertEquals(5, paths.size()); - } + assertEquals(2, rsp.getCompactsSize()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); - private Table prepareTestTable() throws Exception { - Table t = newTable("default", "camtc", false); + expectedDirs.add(deltaName2); + for (int i = 5; i < 7; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } - addBaseFile(t, null, 20L, 20); - addDeltaFile(t, null, 21L, 22L, 2); - addDeltaFile(t, null, 23L, 24L, 2); - addDeltaFile(t, null, 25L, 25, 2); + txnHandler.commitTxn(new CommitTxnRequest(longQuery)); - burnThroughTransactions("default", "camtc", 25); - return t; + String deltaName3 = createDeltasAndRunMajorCompaction(t1, 7, 2); + startCleaner(); + expectedDirs.add(deltaName3); + for (int i = 7; i < 9; i++) { + expectedDirs.remove(makeDeltaDirName(i, i)); + } + rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(3, rsp.getCompactsSize()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(2).getState()); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index aa613aa8192d..c0a3f84306b0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -90,8 +90,8 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns if (isAbortCleanup) { exceptions[i] = txnId; } else { - throw new IllegalStateException( - JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark); +// throw new IllegalStateException( +// JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark); } } ++i; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java index f81df0eacaa4..3d2aa8bd1a3a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java @@ -25,10 +25,15 @@ import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionState; import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.jdbc.commands.InsertCompactionInfoCommand; import org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveTxnsFromMinHistoryLevelCommand; import org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveWriteIdsFromMinHistoryCommand; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.DbTimeHandler; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetCompactionInfoHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetDatabaseIdHandler; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; @@ -36,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.UncategorizedSQLException; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import java.sql.Connection; import java.sql.PreparedStatement; @@ -164,6 +170,21 @@ public Integer execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_ABORTED_TXNS).inc(txnids.size()); } LOG.warn("Aborted {} transaction(s) {} due to {}", txnids.size(), txnids, txnErrorMsg); + + for (long abortedTxnId: txnids) { + CompactionInfo ci = jdbcResource.execute(new GetCompactionInfoHandler(abortedTxnId, true)); + if (ci != null && CompactionState.fromSqlConst(ci.state) == CompactionState.READY_FOR_CLEANING) { + LOG.info("Going to mark the compaction {} as failed, as its txn got aborted.", ci); + jdbcResource.execute("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id", + new MapSqlParameterSource("id", ci.id), null); + long endTime = jdbcResource.execute(new DbTimeHandler()).getTime(); + ci.state = 'f'; + ci.errorMessage = "The txn (id=" + abortedTxnId + ") of this compaction got aborted."; + jdbcResource.execute(new InsertCompactionInfoCommand(ci, endTime)); + LOG.info("Finished marking compaction {} as failed", ci); + } + } + return numAborted; } catch (SQLException e) { throw new UncategorizedSQLException(null, null, e); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java index 1e1ea51420c9..c3f733c0b09f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.namedparam.SqlParameterSource; @@ -93,6 +94,9 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw } else if (minOpenTxnWaterMark > 0) { whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)"; } + + queryStr += " LEFT OUTER JOIN \"TXNS\" ON \"cq1\".\"CQ_TXN_ID\" = \"TXN_ID\""; + whereClause += " AND (\"CQ_TXN_ID\" IS NULL OR \"TXN_STATE\" != " + TxnStatus.OPEN + ")"; queryStr += whereClause + " ORDER BY \"CQ_ID\""; queryStr = databaseProduct.addLimitClause(fetchSize, queryStr); From 529e2cb1a2f3e4bdd24b2df9bb2c2b359c33fb22 Mon Sep 17 00:00:00 2001 From: Marta Kuczora Date: Tue, 19 May 2026 11:08:03 +0200 Subject: [PATCH 2/2] HIVE-29572: ACID Compaction: Cleaner should check the state of the compaction txn before start cleaning - different approach --- .../txn/compactor/handler/CompactionCleaner.java | 12 ++++++++++++ .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 5 ++++- .../hadoop/hive/metastore/txn/TxnHandler.java | 9 +++++++-- .../hadoop/hive/metastore/txn/TxnStore.java | 5 +++++ .../txn/jdbc/functions/AbortTxnsFunction.java | 15 --------------- .../txn/jdbc/queries/ReadyToCleanHandler.java | 7 ++++--- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java index 2a593e4927f7..00b4d0f5855f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest; import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder; @@ -97,6 +98,17 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t LOG.info("Starting cleaning for {}, based on min open {}", ci, (ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn); + if (ci.nextTxnId == 0 && ci.txnId > 0) { + TxnStatus status = txnHandler.getTransactionStatus(ci.txnId); + if (TxnStatus.ABORTED == status) { + LOG.warn("The compaction {} is in invalid state. The compaction is marked as 'ready for cleaning', " + + "but its txn is in aborted state. Marking this compaction as failed."); + ci.errorMessage = "Invalid state: the compaction txn (" + ci.txnId + ") is already aborted."; + txnHandler.markFailed(ci); + return; + } + } + PerfLogger perfLogger = PerfLogger.getPerfLogger(false); String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" + (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index a33ea9eab451..3c252650dd0e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -1404,6 +1404,9 @@ public void testCleanerRunsWithOpenCompactionTxn() throws Exception { expectedDirs.add(makeDeltaDirName(i, i)); } verifyDirectories(t, expectedDirs); + // Should mark the compaction 1 failed as its txn is aborted + startCleaner(); + verifyDirectories(t, expectedDirs); // Should find the second compaction startCleaner(); @@ -1425,7 +1428,7 @@ public void testCleanerRunsWithOpenCompactionTxn() throws Exception { assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().get(2).getState()); - assertEquals("The txn (id=" + txnId + ") of this compaction got aborted.", + assertEquals("Invalid state: the compaction txn (" + txnId + ") is already aborted.", rsp.getCompacts().get(2).getErrorMessage()); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a847e01aca4d..b7c81f8237cf 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1100,8 +1100,13 @@ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throw private static void shouldNeverHappen(long txnid) { throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); - } - + } + + public TxnStatus getTransactionStatus(long txnId) throws MetaException { + TxnStatus status = jdbcResource.execute(new FindTxnStateHandler(txnId)); + return status; + } + private void deleteInvalidOpenTransactions(List txnIds) throws MetaException { try { sqlRetryHandler.executeWithRetry(new SqlRetryCallProperties().withCallerId("deleteInvalidOpenTransactions"), diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 22e6c279fc84..bcb443a99790 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.entities.CompactionMetricsData; import org.apache.hadoop.hive.metastore.txn.entities.MetricsInfo; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.retry.SqlRetry; import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryException; @@ -336,6 +337,10 @@ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) @RetrySemantics.SafeToRetry void addWriteIdsToMinHistory(long txnId, Map minOpenWriteIds) throws MetaException; + @SqlRetry(lockInternally = true, retryOnDuplicateKey = true) + @Transactional(POOL_TX) + public TxnStatus getTransactionStatus(long txnId) throws MetaException; + /** * Allocate a write ID for the given table and associate it with a transaction * @param rqst info on transaction and table to allocate write id diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java index 3d2aa8bd1a3a..9bb1ee1608d3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortTxnsFunction.java @@ -170,21 +170,6 @@ public Integer execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_ABORTED_TXNS).inc(txnids.size()); } LOG.warn("Aborted {} transaction(s) {} due to {}", txnids.size(), txnids, txnErrorMsg); - - for (long abortedTxnId: txnids) { - CompactionInfo ci = jdbcResource.execute(new GetCompactionInfoHandler(abortedTxnId, true)); - if (ci != null && CompactionState.fromSqlConst(ci.state) == CompactionState.READY_FOR_CLEANING) { - LOG.info("Going to mark the compaction {} as failed, as its txn got aborted.", ci); - jdbcResource.execute("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id", - new MapSqlParameterSource("id", ci.id), null); - long endTime = jdbcResource.execute(new DbTimeHandler()).getTime(); - ci.state = 'f'; - ci.errorMessage = "The txn (id=" + abortedTxnId + ") of this compaction got aborted."; - jdbcResource.execute(new InsertCompactionInfoCommand(ci, endTime)); - LOG.info("Finished marking compaction {} as failed", ci); - } - } - return numAborted; } catch (SQLException e) { throw new UncategorizedSQLException(null, null, e); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java index c3f733c0b09f..7bc924d0a094 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java @@ -63,7 +63,7 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw String queryStr = " \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," + " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\", \"CQ_RETRY_RETENTION\", " + - " \"CQ_NEXT_TXN_ID\""; + " \"CQ_TXN_ID\", \"CQ_NEXT_TXN_ID\""; if (TxnHandler.ConfVars.useMinHistoryWriteId()) { queryStr += ", \"MIN_OPEN_WRITE_ID\""; } @@ -122,9 +122,10 @@ public List extractData(ResultSet rs) throws SQLException, DataA info.highestWriteId = rs.getLong(7); info.properties = rs.getString(8); info.retryRetention = rs.getInt(9); - info.nextTxnId = rs.getLong(10); + info.txnId = rs.getLong(10); + info.nextTxnId = rs.getLong(11); if (TxnHandler.ConfVars.useMinHistoryWriteId()) { - long value = rs.getLong(11); + long value = rs.getLong(12); info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE; } infos.add(info);