Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class MetastoreLock implements HiveLock {

private final ClientPool<IMetaStoreClient, TException> metaClients;

private final String catalogName;
private final String databaseName;
private final String tableName;
private final String fullName;
Expand All @@ -100,6 +101,7 @@ public MetastoreLock(Configuration conf, ClientPool<IMetaStoreClient, TException
String catalogName, String databaseName, String tableName) {
this.metaClients = metaClients;
this.fullName = catalogName + "." + databaseName + "." + tableName;
this.catalogName = catalogName;
this.databaseName = databaseName;
this.tableName = tableName;

Expand Down Expand Up @@ -179,8 +181,8 @@ public void unlock() {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private long acquireLock() throws LockException {
if (hmsLockId.isPresent()) {
throw new IllegalArgumentException(String.format("HMS lock ID=%s already acquired for table %s.%s",
hmsLockId.get(), databaseName, tableName));
throw new IllegalArgumentException(String.format("HMS lock ID=%s already acquired for table %s.%s.%s",
hmsLockId.get(), catalogName, databaseName, tableName));
}
LockInfo lockInfo = createLock();

Expand Down Expand Up @@ -212,12 +214,13 @@ private long acquireLock() throws LockException {
lockInfo.lockState = newState;
if (newState.equals(LockState.WAITING)) {
throw new WaitingForLockException(String.format(
"Waiting for lock on table %s.%s", databaseName, tableName));
"Waiting for lock on table %s.%s.%s", catalogName, databaseName, tableName));
}
} catch (InterruptedException e) {
Thread.interrupted(); // Clear the interrupt status flag
LOG.warn(
"Interrupted while waiting for lock on table {}.{}",
"Interrupted while waiting for lock on table {}.{}.{}",
catalogName,
databaseName,
tableName,
e);
Expand All @@ -239,19 +242,19 @@ private long acquireLock() throws LockException {
if (!lockInfo.lockState.equals(LockState.ACQUIRED)) {
// timeout and do not have lock acquired
if (timeout) {
throw new LockException("Timed out after %s ms waiting for lock on %s.%s",
duration, databaseName, tableName);
throw new LockException("Timed out after %s ms waiting for lock on %s.%s.%s",
duration, catalogName, databaseName, tableName);
}

if (thriftError != null) {
throw new LockException(
thriftError, "Metastore operation failed for %s.%s", databaseName, tableName);
thriftError, "Metastore operation failed for %s.%s.%s", catalogName, databaseName, tableName);
}

// Just for safety. We should not get here.
throw new LockException(
"Could not acquire the lock on %s.%s, lock request ended in state %s",
databaseName, tableName, lockInfo.lockState);
"Could not acquire the lock on %s.%s.%s, lock request ended in state %s",
catalogName, databaseName, tableName, lockInfo.lockState);
} else {
return lockInfo.lockId;
}
Expand All @@ -277,6 +280,7 @@ private LockInfo createLock() throws LockException {

LockComponent lockComponent =
new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, databaseName);
lockComponent.setCatName(catalogName);
lockComponent.setTablename(tableName);
LockRequest lockRequest =
new LockRequest(
Expand Down Expand Up @@ -318,27 +322,32 @@ private LockInfo createLock() throws LockException {
}
}

throw new LockException("Failed to find lock for table %s.%s", databaseName, tableName);
throw new LockException("Failed to find lock for table %s.%s.%s", catalogName, databaseName,
tableName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupted.set(true);
LOG.warn(
"Interrupted while trying to find lock for table {}.{}", databaseName, tableName, e);
"Interrupted while trying to find lock for table {}.{}.{}", catalogName, databaseName,
tableName, e);
throw new LockException(
e, "Interrupted while trying to find lock for table %s.%s", databaseName, tableName);
e, "Interrupted while trying to find lock for table %s.%s.%s", catalogName, databaseName,
tableName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupted.set(true);
LOG.warn("Interrupted while creating lock on table {}.{}", databaseName, tableName, e);
LOG.warn("Interrupted while creating lock on table {}.{}.{}", catalogName, databaseName,
tableName, e);
throw new LockException(
e, "Interrupted while creating lock on table %s.%s", databaseName, tableName);
e, "Interrupted while creating lock on table %s.%s.%s", catalogName, databaseName,
tableName);
}
},
LockException.class);

// This should be initialized always, or exception should be thrown.
LOG.debug("Lock {} created for table {}.{}", lockInfo, databaseName, tableName);
LOG.debug("Lock {} created for table {}.{}.{}", lockInfo, catalogName, databaseName, tableName);
return lockInfo;
}

Expand All @@ -354,13 +363,14 @@ private LockInfo findLock() throws LockException, InterruptedException {
HiveVersion.min(HiveVersion.HIVE_2),
"Minimally Hive 2 HMS client is needed to find the Lock using the showLocks API call");
ShowLocksRequest showLocksRequest = new ShowLocksRequest();
showLocksRequest.setCatname(catalogName);
showLocksRequest.setDbname(databaseName);
showLocksRequest.setTablename(tableName);
ShowLocksResponse response;
try {
response = metaClients.run(client -> client.showLocks(showLocksRequest));
} catch (TException e) {
throw new LockException(e, "Failed to find lock for table %s.%s", databaseName, tableName);
throw new LockException(e, "Failed to find lock for table %s.%s.%s", catalogName, databaseName, tableName);
}
for (ShowLocksResponseElement lock : response.getLocks()) {
if (lock.getAgentInfo().equals(agentInfo)) {
Expand Down Expand Up @@ -403,19 +413,22 @@ private void unlock(Optional<Long> lockId) {
// Interrupted unlock. We try to unlock one more time if we have a lockId
try {
Thread.interrupted(); // Clear the interrupt status flag for now, so we can retry unlock
LOG.warn("Interrupted unlock we try one more time {}.{}", databaseName, tableName, ie);
LOG.warn("Interrupted unlock we try one more time {}.{}.{}", catalogName, databaseName,
tableName, ie);
doUnlock(id);
} catch (Exception e) {
LOG.warn("Failed to unlock even on 2nd attempt {}.{}", databaseName, tableName, e);
LOG.warn("Failed to unlock even on 2nd attempt {}.{}.{}", catalogName, databaseName,
tableName, e);
} finally {
Thread.currentThread().interrupt(); // Set back the interrupt status
}
} else {
Thread.currentThread().interrupt(); // Set back the interrupt status
LOG.warn("Interrupted finding locks to unlock {}.{}", databaseName, tableName, ie);
LOG.warn("Interrupted finding locks to unlock {}.{}.{}", catalogName, databaseName,
tableName, ie);
}
} catch (Exception e) {
LOG.warn("Failed to unlock {}.{}", databaseName, tableName, e);
LOG.warn("Failed to unlock {}.{}.{}", catalogName, databaseName, tableName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void testUnLockAfterInterruptedLock() throws TException {
.isInstanceOf(RuntimeException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: " +
"Interrupted while creating lock on table hivedb.tbl");
"Interrupted while creating lock on table hive.hivedb.tbl");

verify(spyClient, times(1)).unlock(eq(dummyLockId));
// Make sure that we exit the lock loop on InterruptedException
Expand All @@ -393,7 +393,7 @@ public void testUnLockAfterInterruptedLockCheck() throws TException {
.isInstanceOf(RuntimeException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: " +
"Could not acquire the lock on hivedb.tbl, lock request ended in state WAITING");
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state WAITING");

verify(spyClient, times(1)).unlock(eq(dummyLockId));
// Make sure that we exit the checkLock loop on InterruptedException
Expand Down Expand Up @@ -452,7 +452,7 @@ public void testLockFailureAtFirstTime() throws TException {
.isInstanceOf(CommitFailedException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: " +
"Could not acquire the lock on hivedb.tbl, lock request ended in state NOT_ACQUIRED");
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state NOT_ACQUIRED");
}

@Test
Expand All @@ -470,7 +470,7 @@ public void testLockFailureAfterRetries() throws TException {
.isInstanceOf(CommitFailedException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: " +
"Could not acquire the lock on hivedb.tbl, lock request ended in state NOT_ACQUIRED");
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state NOT_ACQUIRED");
}

@Test
Expand All @@ -482,7 +482,7 @@ public void testLockTimeoutAfterRetries() throws TException {
.isInstanceOf(CommitFailedException.class)
.hasMessageStartingWith("org.apache.iceberg.hive.LockException")
.hasMessageContaining("Timed out after")
.hasMessageEndingWith("waiting for lock on hivedb.tbl");
.hasMessageEndingWith("waiting for lock on hive.hivedb.tbl");
}

@Test
Expand All @@ -507,7 +507,7 @@ public void testPassThroughThriftExceptionsForHiveVersion_1()
assertThatThrownBy(() -> spyOps.doCommit(metadataV2, metadataV1))
.isInstanceOf(CommitFailedException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: Failed to find lock for table hivedb.tbl");
"org.apache.iceberg.hive.LockException: Failed to find lock for table hive.hivedb.tbl");
}
}

Expand All @@ -520,7 +520,7 @@ public void testPassThroughThriftExceptions() throws TException {
assertThatThrownBy(() -> spyOps.doCommit(metadataV2, metadataV1))
.isInstanceOf(RuntimeException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: Metastore operation failed for hivedb.tbl");
"org.apache.iceberg.hive.LockException: Metastore operation failed for hive.hivedb.tbl");
}

@Test
Expand All @@ -536,7 +536,7 @@ public void testPassThroughInterruptions() throws TException {
.isInstanceOf(CommitFailedException.class)
.hasMessage(
"org.apache.iceberg.hive.LockException: " +
"Could not acquire the lock on hivedb.tbl, lock request ended in state WAITING");
"Could not acquire the lock on hive.hivedb.tbl, lock request ended in state WAITING");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class BaseReplicationScenariosAcidTables {

protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base");
protected static final String PRIMARY_CAT_NAME = Warehouse.DEFAULT_CATALOG_NAME;
protected static String fullyQualifiedReplicaExternalBase;
static WarehouseInstance primary;
static WarehouseInstance replica, replicaNonAcid;
Expand Down Expand Up @@ -348,8 +350,8 @@ List<Long> openTxns(int numTxns, TxnStore txnHandler, HiveConf primaryConf) thro
return txns;
}

List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<String, Long> tables,
TxnStore txnHandler,
List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, String primaryDbName,
Map<String, Long> tables, TxnStore txnHandler,
List<Long> txns, HiveConf primaryConf) throws Throwable {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
rqst.setDbName(primaryDbName);
Expand All @@ -361,6 +363,7 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<St
for (long txnId : txns) {
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
primaryDbName);
comp.setCatName(primaryCatName);
comp.setTablename(entry.getKey());
comp.setOperationType(DataOperationType.UPDATE);
List<LockComponent> components = new ArrayList<LockComponent>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void testReverseBootstrap() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);

Expand All @@ -576,8 +576,8 @@ public void testReverseBootstrap() throws Throwable {
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, replica.getConf());
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, replicatedDbName,
tablesInSourceDb, txnHandler, txnsForSourceDb, replica.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down Expand Up @@ -1092,7 +1092,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb);
tablesInSecDb.put("t2", (long) numTxnsForSecDb);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);

Expand All @@ -1105,8 +1105,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, primary.getConf());
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
tablesInSourceDb, txnHandler, txnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down Expand Up @@ -1157,7 +1157,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> newTablesForSecDb = new HashMap<>();
newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
tearDownLockIds.addAll(newLockIdsForSecDb);

Expand All @@ -1169,8 +1169,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> newTablesInSourceDb = new HashMap<>();
newTablesInSourceDb.put("t1", (long) 5);
newTablesInSourceDb.put("t5", (long) 3);
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, txnHandler,
newTxnsForSourceDb, primary.getConf());
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
newTablesInSourceDb, txnHandler, newTxnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(newLockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down
Loading