diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 0891bad2bdf5..c37917d0e9ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -53,6 +53,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.AbstractMap; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +62,9 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties; @@ -112,6 +116,10 @@ public JdbcClientPool getConnections() { return connections; } + public String getCatalogKey() { + return catalogKey; + } + /** Initialize catalog tables. */ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException { String uri = options.get(CatalogOptions.URI.key()); @@ -301,19 +309,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) { runWithLock(identifier, () -> schemaManager.createTable(schema)); // Update schema metadata Path path = getTableLocation(identifier); - int insertRecord = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement( - JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { - sql.setString(1, catalogKey); - sql.setString(2, identifier.getDatabaseName()); - sql.setString(3, identifier.getTableName()); - return sql.executeUpdate(); - } - }); - if (insertRecord == 1) { + if (JdbcUtils.insertTable( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName())) { LOG.debug("Successfully committed to new table: {}", identifier); } else { try { @@ -415,6 +415,80 @@ public T runWithLock(Identifier identifier, Callable callable) throws Exc return Lock.fromCatalog(lock, identifier).runWithLock(callable); } + @Override + public void repairCatalog() { + List databases; + try { + databases = listDatabasesInFileSystem(new Path(warehouse)); + } catch (IOException e) { + throw new RuntimeException("Failed to list databases in file system", e); + } + for (String database : databases) { + repairDatabase(database); + } + } + + @Override + public void repairDatabase(String databaseName) { + checkNotSystemDatabase(databaseName); + + // First check if database exists in file system + Path databasePath = newDatabasePath(databaseName); + List tables; + try { + if (!fileIO.exists(databasePath)) { + throw new RuntimeException("Database directory does not exist: " + databasePath); + } + tables = listTablesInFileSystem(databasePath); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) { + createDatabaseImpl(databaseName, Collections.emptyMap()); + } + + // Repair tables + for (String table : tables) { + try { + repairTable(Identifier.create(databaseName, table)); + } catch (TableNotExistException ignore) { + // Table might not exist due to concurrent operations + } + } + } + + @Override + public void repairTable(Identifier identifier) throws TableNotExistException { + checkNotBranch(identifier, "repairTable"); + checkNotSystemTable(identifier, "repairTable"); + + // First check if table exists in file system + Path tableLocation = getTableLocation(identifier); + TableSchema tableSchema = + tableSchemaInFileSystem(tableLocation, identifier.getBranchNameOrDefault()) + .orElseThrow(() -> new TableNotExistException(identifier)); + + if (!JdbcUtils.databaseExists(connections, catalogKey, identifier.getDatabaseName())) { + createDatabaseImpl(identifier.getDatabaseName(), Collections.emptyMap()); + } + // Table exists in file system, now check if it exists in JDBC catalog + if (!JdbcUtils.tableExists( + connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) { + // Table missing from JDBC catalog, repair it + if (JdbcUtils.insertTable( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName())) { + LOG.debug("Successfully repaired table: {}", identifier); + } else { + LOG.error("Failed to repair table: {}", identifier); + } + } + // If table exists in both file system and JDBC catalog, nothing to repair + } + @Override public void close() throws Exception { connections.close(); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 1b9e599d72bc..bbecda0bc617 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -377,6 +377,27 @@ public static int execute( } } + public static boolean insertTable( + JdbcClientPool connections, String catalogKey, String databaseName, String tableName) { + try { + int insertRecord = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement( + JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { + sql.setString(1, catalogKey); + sql.setString(2, databaseName); + sql.setString(3, tableName); + return sql.executeUpdate(); + } + }); + return insertRecord == 1; + } catch (SQLException | InterruptedException e) { + throw new RuntimeException("Failed to insert table: " + tableName, e); + } + } + public static boolean insertProperties( JdbcClientPool connections, String storeKey, diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 9e106ad966b0..9e7204ec9f31 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -24,8 +24,12 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.junit.jupiter.api.BeforeEach; @@ -34,6 +38,8 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.sql.SQLException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -129,4 +135,199 @@ public void testSerializeTable() throws Exception { protected boolean supportsAlterDatabase() { return true; } + + @Test + public void testRepairTableNotExist() throws Exception { + String databaseName = "repair_db"; + String tableName = "nonexistent_table"; + + catalog.createDatabase(databaseName, false); + Identifier identifier = Identifier.create(databaseName, tableName); + + // Test repair on non-existent table - should throw TableNotExistException + assertThatThrownBy(() -> catalog.repairTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + } + + @Test + public void testRepairTableWithSystemTable() { + Identifier systemTableId = Identifier.create("sys", "system_table"); + + // System tables should not be repairable + assertThatThrownBy(() -> catalog.repairTable(systemTableId)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("sys"); + } + + @Test + public void testRepairTable() throws Exception { + String databaseName = "fs_repair_db"; + String tableName = "fs_repair_table"; + + // Create table normally (this creates both filesystem and JDBC entries) + catalog.createDatabase(databaseName, false); + Identifier identifier = Identifier.create(databaseName, tableName); + catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify table exists in both places + assertThat(catalog.listTables(databaseName)).contains(tableName); + assertDoesNotThrow(() -> catalog.getTable(identifier)); + + // Repair on existing table should work fine (idempotent operation) + assertDoesNotThrow(() -> catalog.repairTable(identifier)); + + // Table should still exist and be accessible + assertThat(catalog.listTables(databaseName)).contains(tableName); + assertDoesNotThrow(() -> catalog.getTable(identifier)); + + // Test repair when table is missing from JDBC store + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + + // Remove table from JDBC store but leave filesystem intact + JdbcUtils.execute( + jdbcCatalog.getConnections(), + JdbcUtils.DROP_TABLE_SQL, + jdbcCatalog.getCatalogKey(), + databaseName, + tableName); + + // Verify table is missing from JDBC catalog + assertThat(catalog.listTables(databaseName)).doesNotContain(tableName); + assertThatThrownBy(() -> catalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Repair the table - should recreate it in JDBC store + assertDoesNotThrow(() -> catalog.repairTable(identifier)); + + // Verify table is back in JDBC catalog after repair + assertThat(catalog.listTables(databaseName)).contains(tableName); + assertDoesNotThrow(() -> catalog.getTable(identifier)); + } + + @Test + public void testRepairDatabase() throws Exception { + String databaseName = "repair_database"; + + // Create database and some tables + catalog.createDatabase(databaseName, false); + catalog.createTable(Identifier.create(databaseName, "table1"), DEFAULT_TABLE_SCHEMA, false); + catalog.createTable(Identifier.create(databaseName, "table2"), DEFAULT_TABLE_SCHEMA, false); + + // Test repair database - should not throw exception and should work correctly + assertDoesNotThrow(() -> catalog.repairDatabase(databaseName)); + + // Verify tables still exist after repair + List tables = catalog.listTables(databaseName); + assertThat(tables).containsExactlyInAnyOrder("table1", "table2"); + + // Test repair when database is missing from JDBC store + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + + // Remove database from JDBC store (this also removes tables) + JdbcUtils.execute( + jdbcCatalog.getConnections(), + JdbcUtils.DELETE_TABLES_SQL, + jdbcCatalog.getCatalogKey(), + databaseName); + JdbcUtils.execute( + jdbcCatalog.getConnections(), + JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, + jdbcCatalog.getCatalogKey(), + databaseName); + + // Verify database is missing from JDBC catalog + assertThat(catalog.listDatabases()).doesNotContain(databaseName); + assertThatThrownBy(() -> catalog.getDatabase(databaseName)) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + + // Repair the database - should recreate database and tables in JDBC store + assertDoesNotThrow(() -> catalog.repairDatabase(databaseName)); + + // Verify database and tables are back in JDBC catalog after repair + assertThat(catalog.listDatabases()).contains(databaseName); + assertThat(catalog.listTables(databaseName)).containsExactlyInAnyOrder("table1", "table2"); + assertDoesNotThrow(() -> catalog.getDatabase(databaseName)); + } + + @Test + public void testRepairDatabaseSystemDatabase() { + // System database should not be repairable + assertThatThrownBy(() -> catalog.repairDatabase("sys")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("sys"); + } + + @Test + public void testRepairDatabaseNotExists() throws Exception { + String nonExistentDb = "non_existent_db"; + + // Repairing a non-existent database should throw RuntimeException + assertThatThrownBy(() -> catalog.repairDatabase(nonExistentDb)) + .isInstanceOf(RuntimeException.class); + + // Database should not exist after failed repair + assertThat(catalog.listDatabases()).doesNotContain(nonExistentDb); + } + + @Test + public void testRepairCatalog() throws Exception { + // Create multiple databases with tables + String[] databases = {"repair_db1", "repair_db2", "repair_db3"}; + + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "data", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap(), + ""); + + for (String dbName : databases) { + catalog.createDatabase(dbName, false); + catalog.createTable(Identifier.create(dbName, "test_table"), schema, false); + } + + // Test repair entire catalog - should not throw exception + assertDoesNotThrow(() -> catalog.repairCatalog()); + + // Verify all databases and tables still exist + List catalogDatabases = catalog.listDatabases(); + for (String dbName : databases) { + assertThat(catalogDatabases).contains(dbName); + assertThat(catalog.listTables(dbName)).contains("test_table"); + } + } + + @Test + public void testInsertTableUtility() throws Exception { + String databaseName = "insert_test_db"; + String tableName = "insert_test_table"; + + catalog.createDatabase(databaseName, false); + + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + + // Test insertTable utility method + boolean result = + JdbcUtils.insertTable( + jdbcCatalog.getConnections(), + jdbcCatalog.getCatalogKey(), + databaseName, + tableName); + + assertThat(result).isTrue(); + + // Try inserting the same table again - should throw exception for duplicate + assertThatThrownBy( + () -> + JdbcUtils.insertTable( + jdbcCatalog.getConnections(), + jdbcCatalog.getCatalogKey(), + databaseName, + tableName)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to insert table"); + } }