From 827ea64b8634661b0cc4dabbc5a3a85d6472980f Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 24 Mar 2026 16:45:03 +0800 Subject: [PATCH 1/6] [lake/tiering] add table dropped handling and nullable write results --- .../apache/fluss/lake/writer/LakeWriter.java | 5 +- .../committer/TieringCommitOperator.java | 11 ++ .../event/TieringTableDroppedEvent.java | 63 ++++++++ .../source/TableBucketWriteResult.java | 16 +- .../TableBucketWriteResultSerializer.java | 10 +- .../source/TieringSourceFetcherManager.java | 56 ++++--- .../tiering/source/TieringSourceReader.java | 9 +- .../tiering/source/TieringSplitReader.java | 153 +++++++++++++++++- .../enumerator/TieringSourceEnumerator.java | 68 +++++++- .../committer/TieringCommitOperatorTest.java | 59 ++++++- .../TableBucketWriteResultSerializerTest.java | 32 +++- .../source/TieringSourceReaderTest.java | 108 +++++++++++++ .../lake/paimon/tiering/PaimonLakeWriter.java | 7 + .../lake/paimon/tiering/RecordWriter.java | 10 ++ 14 files changed, 574 insertions(+), 33 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java index c3ab3a352a..b8b004f601 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java @@ -20,6 +20,8 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.record.LogRecord; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; @@ -43,8 +45,9 @@ public interface LakeWriter extends Closeable { /** * Completes the writing process and returns the write result. * - * @return the write result + * @return the write result, or null if no data was written (empty write scenario) * @throws IOException if an I/O error occurs */ + @Nullable WriteResult complete() throws IOException; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index 2f3a69a7ea..33c65a0c8c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -151,6 +151,17 @@ public void processElement(StreamRecord> str collectTableAllBucketWriteResult(tableId); if (committableWriteResults != null) { + // Check if any result is cancelled (table was dropped) + boolean isCancelled = + committableWriteResults.stream().anyMatch(TableBucketWriteResult::isCancelled); + if (isCancelled) { + LOG.info( + "Skipping commit for dropped table {}, table path {}.", + tableId, + tableBucketWriteResult.tablePath()); + collectedTableBucketWriteResults.remove(tableId); + return; + } try { CommitResult commitResult = commitWriteResults( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java new file mode 100644 index 0000000000..656810288b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * SourceEvent used to notify TieringSourceReader that a table has been dropped or recreated, and + * all pending splits for this table should be skipped. + */ +public class TieringTableDroppedEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final long tableId; + + public TieringTableDroppedEvent(long tableId) { + this.tableId = tableId; + } + + public long getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TieringTableDroppedEvent)) { + return false; + } + TieringTableDroppedEvent that = (TieringTableDroppedEvent) o; + return tableId == that.tableId; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableId); + } + + @Override + public String toString() { + return "TieringTableDroppedEvent{" + "tableId=" + tableId + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java index abec3c6c21..72378ad70f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java @@ -30,6 +30,11 @@ * that the write result is for, the end log offset of tiering, the total number of write results in * one round of tiering. It'll be passed to downstream committer operator to collect all the write * results of a table and do commit. + * + *

When {@code cancelled} is {@code true}, it indicates this result originates from a tiering + * round that was aborted (e.g., the table was dropped). In this case the {@link WriteResult} will + * always be {@code null} and the downstream committer should skip the commit and instead report the + * cancellation back to the coordinator. */ public class TableBucketWriteResult implements Serializable { @@ -57,6 +62,9 @@ public class TableBucketWriteResult implements Serializable { // for the round of tiering is finished private final int numberOfWriteResults; + // indicates whether this result is from a cancelled tiering (e.g., table was dropped) + private final boolean cancelled; + public TableBucketWriteResult( TablePath tablePath, TableBucket tableBucket, @@ -64,7 +72,8 @@ public TableBucketWriteResult( @Nullable WriteResult writeResult, long logEndOffset, long maxTimestamp, - int numberOfWriteResults) { + int numberOfWriteResults, + boolean cancelled) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partitionName = partitionName; @@ -72,6 +81,7 @@ public TableBucketWriteResult( this.logEndOffset = logEndOffset; this.maxTimestamp = maxTimestamp; this.numberOfWriteResults = numberOfWriteResults; + this.cancelled = cancelled; } public TablePath tablePath() { @@ -103,4 +113,8 @@ public long logEndOffset() { public long maxTimestamp() { return maxTimestamp; } + + public boolean isCancelled() { + return cancelled; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 3651760955..34a6d41c0d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -91,6 +91,9 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu // serialize number of write results out.writeInt(tableBucketWriteResult.numberOfWriteResults()); + // serialize cancelled flag + out.writeBoolean(tableBucketWriteResult.isCancelled()); + final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -136,6 +139,10 @@ public TableBucketWriteResult deserialize(int version, byte[] seria long maxTimestamp = in.readLong(); // deserialize number of write results int numberOfWriteResults = in.readInt(); + + // deserialize cancelled flag + boolean cancelled = in.readBoolean(); + return new TableBucketWriteResult<>( tablePath, tableBucket, @@ -143,6 +150,7 @@ public TableBucketWriteResult deserialize(int version, byte[] seria writeResult, logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + cancelled); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java index ac72aad664..95de907576 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -55,41 +55,61 @@ public TieringSourceFetcherManager( } public void markTableReachTieringMaxDuration(long tableId) { + LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table {}", tableId); + enqueueTaskForTable( + tableId, + reader -> { + LOG.debug( + "Executing handleTableReachTieringMaxDuration in split reader for table {}", + tableId); + reader.handleTableReachTieringMaxDuration(tableId); + }, + "handleTableReachTieringMaxDuration"); + } + + public void markTableDropped(long tableId) { + LOG.info("Enqueueing handleTableDropped task for table {}", tableId); + enqueueTaskForTable( + tableId, + reader -> { + LOG.debug("Executing handleTableDropped in split reader for table {}", tableId); + reader.handleTableDropped(tableId); + }, + "handleTableDropped"); + } + + private void enqueueTaskForTable( + long tableId, Consumer> action, String actionDesc) { + SplitFetcher, TieringSplit> splitFetcher; if (!fetchers.isEmpty()) { - // The fetcher thread is still running. This should be the majority of the cases. - LOG.info("fetchers is not empty, marking tiering max duration for table {}", tableId); - fetchers.values() - .forEach( - splitFetcher -> - enqueueMarkTableReachTieringMaxDurationTask( - splitFetcher, tableId)); + LOG.info("Fetchers are active, enqueueing {} task for table {}", actionDesc, tableId); + fetchers.values().forEach(f -> enqueueReaderTask(f, action)); } else { - SplitFetcher, TieringSplit> splitFetcher = - createSplitFetcher(); LOG.info( - "fetchers is empty, enqueue marking tiering max duration for table {}", + "No active fetchers, creating new fetcher and enqueueing {} task for table {}", + actionDesc, tableId); - enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId); + splitFetcher = createSplitFetcher(); + enqueueReaderTask(splitFetcher, action); startFetcher(splitFetcher); } } - private void enqueueMarkTableReachTieringMaxDurationTask( + @SuppressWarnings("unchecked") + private void enqueueReaderTask( SplitFetcher, TieringSplit> splitFetcher, - long reachTieringDeadlineTable) { + Consumer> action) { splitFetcher.enqueueTask( new SplitFetcherTask() { @Override public boolean run() { - ((TieringSplitReader) splitFetcher.getSplitReader()) - .handleTableReachTieringMaxDuration(reachTieringDeadlineTable); + action.accept( + (TieringSplitReader) splitFetcher.getSplitReader()); return true; } @Override - public void wakeUp() { - // do nothing - } + public void wakeUp() {} }); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index eae584efed..562708628c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -125,9 +126,15 @@ public void handleSourceEvents(SourceEvent sourceEvent) { TieringReachMaxDurationEvent reachMaxDurationEvent = (TieringReachMaxDurationEvent) sourceEvent; long tableId = reachMaxDurationEvent.getTableId(); - LOG.info("Received reach max duration for table {}", tableId); + LOG.info("Received reach max duration event for table {}", tableId); ((TieringSourceFetcherManager) splitFetcherManager) .markTableReachTieringMaxDuration(tableId); + } else if (sourceEvent instanceof TieringTableDroppedEvent) { + TieringTableDroppedEvent tableDroppedEvent = (TieringTableDroppedEvent) sourceEvent; + long tableId = tableDroppedEvent.getTableId(); + LOG.info("Received table dropped event for table {}", tableId); + ((TieringSourceFetcherManager) splitFetcherManager) + .markTableDropped(tableId); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 1e530d69a3..4abaa07b8c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -84,6 +84,7 @@ public class TieringSplitReader private final Map> pendingTieringSplits; private final Set reachTieringMaxDurationTables; + private final Set droppedTables; private final Map> lakeWriters; private final Connection connection; @@ -128,6 +129,7 @@ protected TieringSplitReader( this.lakeWriters = new HashMap<>(); this.currentPendingSnapshotSplits = new ArrayDeque<>(); this.reachTieringMaxDurationTables = new HashSet<>(); + this.droppedTables = new HashSet<>(); this.pollTimeout = pollTimeout; } @@ -143,6 +145,13 @@ public RecordsWithSplitIds> fetch() throws I currentEmptySplits.clear(); return records; } + + // Check droppedTables BEFORE checkSplitOrStartNext to quickly respond to already-marked + // current table + if (currentTableId != null && droppedTables.contains(currentTableId)) { + return forceCompleteDroppedTable(); + } + checkSplitOrStartNext(); // may read snapshot firstly @@ -164,7 +173,26 @@ public RecordsWithSplitIds> fetch() throws I if (reachTieringMaxDurationTables.contains(currentTableId)) { return forceCompleteTieringLogRecords(); } - ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); + ScanRecords scanRecords; + try { + scanRecords = currentLogScanner.poll(pollTimeout); + } catch (Exception e) { + // When a table is actually dropped, the log scanner's poll may fail + // because metadata update discovers the table no longer exists. + if (droppedTables.contains(currentTableId)) { + LOG.warn( + "Log scanner poll failed for dropped table {}, force completing.", + currentTableId, + e); + return forceCompleteDroppedTable(); + } + // The table may have been dropped but TieringTableDroppedEvent + // hasn't been processed yet. Return empty to let the fetcher + // retry on next iteration when the event will be processed. + LOG.warn( + "Log scanner poll failed for table {}, will retry.", currentTableId, e); + return emptyTableBucketWriteResultWithSplitIds(); + } return forLogRecords(scanRecords); } else { return emptyTableBucketWriteResultWithSplitIds(); @@ -445,7 +473,8 @@ private TableBucketWriteResult completeLakeWriter( writeResult, logEndOffset, maxTimestamp, - checkNotNull(currentTableNumberOfSplits)); + checkNotNull(currentTableNumberOfSplits), + false); } private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { @@ -463,7 +492,8 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set empt null, UNKNOWN_BUCKET_OFFSET, UNKNOWN_BUCKET_TIMESTAMP, - tieringSplit.getNumberOfSplits())); + tieringSplit.getNumberOfSplits(), + false)); } return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } @@ -544,6 +574,7 @@ private void finishCurrentTable() throws IOException { throw new IOException("Fail to finish current table.", e); } reachTieringMaxDurationTables.remove(currentTableId); + droppedTables.remove(currentTableId); // before switch to a new table, mark all as empty or null currentTableId = null; currentTablePath = null; @@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long tableId) { } } + /** + * Handle a table being dropped. This will mark the table as dropped, and it will be force + * completed with empty results in the next fetch cycle. + * + *

For the currently active table, the dropped flag is set so that {@link #fetch()} detects + * it at the start of the next cycle and calls {@link #forceCompleteDroppedTable()}. For tables + * in {@code pendingTieringSplits}, the flag is also set here; those splits will be skipped when + * they become the active table and the dropped flag is detected. + * + * @param tableId the id of the dropped table + */ + public void handleTableDropped(long tableId) { + LOG.info( + "handleTableDropped, tableId: {}, currentTableId: {}, pendingTieringSplits: {}", + tableId, + currentTableId, + pendingTieringSplits); + if ((currentTableId != null && currentTableId.equals(tableId)) + || pendingTieringSplits.containsKey(tableId)) { + // Current table is being dropped, mark it for force completion in next fetch + LOG.info("Table {} is dropped, will force to complete with empty results.", tableId); + droppedTables.add(tableId); + } + } + + /** + * Force complete tiering for a dropped table. This will close any in-progress lake writers + * without completing (discarding uncommitted data), then finish all remaining splits with null + * write results. + */ + private RecordsWithSplitIds> forceCompleteDroppedTable() + throws IOException { + LOG.info("Force completing dropped table {}", currentTableId); + + Map> writeResults = new HashMap<>(); + Map finishedSplitIds = new HashMap<>(); + + // Generate empty results for all splits (both log and snapshot) + Iterator> splitsIterator = + currentTableSplitsByBucket.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry entry = splitsIterator.next(); + TableBucket bucket = entry.getKey(); + TieringSplit split = entry.getValue(); + if (split != null) { + // Close lake writer without complete - discard data for dropped table + LakeWriter lakeWriter = lakeWriters.remove(bucket); + if (lakeWriter != null) { + try { + lakeWriter.close(); + } catch (Exception e) { + LOG.warn("Failed to close lake writer for bucket {}", bucket, e); + } + } + + TableBucketWriteResult bucketResult = + toTableBucketWriteResult( + split.getTablePath(), + bucket, + split.getPartitionName(), + null, + UNKNOWN_BUCKET_OFFSET, + UNKNOWN_BUCKET_TIMESTAMP, + checkNotNull(currentTableNumberOfSplits), + true); + writeResults.put(bucket, bucketResult); + finishedSplitIds.put(bucket, split.splitId()); + LOG.info( + "Split {} is forced to be finished due to table dropped with empty result.", + split.splitId()); + splitsIterator.remove(); + } + } + + // Close any remaining lake writers that don't have corresponding splits + for (Map.Entry> entry : lakeWriters.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOG.warn("Failed to close orphan lake writer for bucket {}", entry.getKey(), e); + } + } + lakeWriters.clear(); + + // Also handle pending snapshot splits for this table + while (!currentPendingSnapshotSplits.isEmpty()) { + TieringSnapshotSplit snapshotSplit = currentPendingSnapshotSplits.poll(); + TableBucket bucket = snapshotSplit.getTableBucket(); + TableBucketWriteResult emptyResult = + toTableBucketWriteResult( + snapshotSplit.getTablePath(), + bucket, + snapshotSplit.getPartitionName(), + null, + UNKNOWN_BUCKET_OFFSET, + UNKNOWN_BUCKET_TIMESTAMP, + checkNotNull(currentTableNumberOfSplits), + true); + writeResults.put(bucket, emptyResult); + finishedSplitIds.put(bucket, snapshotSplit.splitId()); + LOG.info( + "Pending snapshot split {} is forced to be finished due to table dropped.", + snapshotSplit.splitId()); + } + + // Note: droppedTables.remove is handled by finishCurrentTable() + finishCurrentTable(); + return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); + } + @Override public void wakeUp() { if (currentLogScanner != null) { @@ -625,7 +766,8 @@ private TableBucketWriteResult toTableBucketWriteResult( @Nullable WriteResult writeResult, long endLogOffset, long maxTimestamp, - int numberOfSplits) { + int numberOfSplits, + boolean cancelled) { return new TableBucketWriteResult<>( tablePath, tableBucket, @@ -633,7 +775,8 @@ private TableBucketWriteResult toTableBucketWriteResult( writeResult, endLogOffset, maxTimestamp, - numberOfSplits); + numberOfSplits, + cancelled); } private class TableBucketWriteResultWithSplitIds diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 337222f4e3..05ed379694 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -27,6 +27,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; @@ -39,9 +40,12 @@ import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; +import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; import org.apache.fluss.rpc.messages.PbLakeTieringStats; import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.utils.MapUtils; import org.apache.flink.api.connector.source.ReaderInfo; @@ -407,8 +411,9 @@ private void assignSplits() { currentFailedTableEpochs, tieringTableEpochs); + LakeTieringHeartbeatResponse heartbeatResponse; if (pendingSplits.isEmpty() && !readersAwaitingSplit.isEmpty()) { - LakeTieringHeartbeatResponse heartbeatResponse = + heartbeatResponse = waitHeartbeatResponse( coordinatorGateway.lakeTieringHeartbeat( heartBeatWithRequestNewTieringTable(tieringHeartbeatRequest))); @@ -427,9 +432,14 @@ private void assignSplits() { } } else { // report heartbeat to fluss coordinator - waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest)); + heartbeatResponse = + waitHeartbeatResponse( + coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest)); } + // Process tiering_table_resp to detect table deletion errors + handleTieringTableResponseErrors(heartbeatResponse); + // if come to here, we can remove currentFinishedTables/failedTableEpochs to avoid send // in next round currentFinishedTables.forEach(finishedTables::remove); @@ -437,6 +447,60 @@ private void assignSplits() { return lakeTieringInfo; } + /** + * Handle errors in tiering_table_resp from heartbeat response. If a table has been dropped, + * mark it as failed and notify readers to skip processing. + */ + private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse heartbeatResponse) { + for (PbHeartbeatRespForTable resp : heartbeatResponse.getTieringTableRespsList()) { + if (resp.hasError()) { + ApiError error = ApiError.fromErrorMessage(resp.getError()); + Errors errors = error.error(); + // Check if the error indicates table doesn't exist or tiering epoch is fenced + // (which happens when table is dropped and recreated) + if (errors == Errors.TABLE_NOT_EXIST + || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION + || errors == Errors.FENCED_TIERING_EPOCH_EXCEPTION) { + long tableId = resp.getTableId(); + LOG.warn( + "Table {} is dropped or epoch mismatch (error: {}), canceling tiering.", + tableId, + errors); + handleTableDropped(tableId); + } + } + } + } + + /** + * Handle a dropped table by marking all related splits to skip, removing from tiering epochs, + * and notifying readers. + */ + private void handleTableDropped(long tableId) { + // Remove from tiering table epochs + Long tieringEpoch = tieringTableEpochs.remove(tableId); + + // Mark all pending splits for this table to skip current round + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == tableId) { + tieringSplit.skipCurrentRound(); + } + } + + // Add to failed table epochs to notify coordinator in next heartbeat + if (tieringEpoch != null) { + failedTableEpochs.put(tableId, tieringEpoch); + } + + // Broadcast the event to all readers to stop processing this table + Set readers = new HashSet<>(context.registeredReaders().keySet()); + for (int reader : readers) { + TieringTableDroppedEvent event = new TieringTableDroppedEvent(tableId); + LOG.info("Send {} to reader {} for dropped table", event, reader); + context.sendEventToSourceReader(reader, event); + } + } + private void generateTieringSplits(Tuple3 tieringTable) throws FlinkRuntimeException { if (tieringTable == null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 70d86bb2c8..06f8895050 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -391,7 +391,8 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { writeResult, logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + false); } private StreamRecord> @@ -403,6 +404,27 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { long logEndOffset, long maxTimestamp, int numberOfWriteResults) { + return createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + partitionName, + writeResult, + logEndOffset, + maxTimestamp, + numberOfWriteResults, + false); + } + + private StreamRecord> + createTableBucketWriteResultStreamRecord( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partitionName, + @Nullable Integer writeResult, + long logEndOffset, + long maxTimestamp, + int numberOfWriteResults, + boolean cancelled) { TableBucketWriteResult tableBucketWriteResult = new TableBucketWriteResult<>( tablePath, @@ -411,7 +433,8 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { writeResult == null ? null : new TestingWriteResult(writeResult), logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + cancelled); return new StreamRecord<>(tableBucketWriteResult); } @@ -505,6 +528,38 @@ void testCommitFailsWhenTableRecreated() throws Exception { .contains("dropped and recreated during tiering"); } + @Test + void testCommitSkippedWhenTableDropped() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_skipped_when_table_dropped"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + // Record the number of events before processing + int eventCountBefore = mockOperatorEventGateway.getEventsSent().size(); + + // Send all write results with cancelled=true + for (int bucket = 0; bucket < numberOfWriteResults; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + null, // partitionName + bucket, // writeResult + bucket, // logEndOffset + (long) bucket, // maxTimestamp + numberOfWriteResults, + true)); // cancelled=true + } + + // Verify no lake snapshot was created + verifyNoLakeSnapshot(tablePath); + + // Verify no FinishedTieringEvent or FailedTieringEvent was sent + int eventCountAfter = mockOperatorEventGateway.getEventsSent().size(); + assertThat(eventCountAfter).isEqualTo(eventCountBefore); + } + private CommittedLakeSnapshot mockCommittedLakeSnapshot( long tableId, TablePath tablePath, int snapshotId, Map logEndOffsets) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java index dbb40eae17..9acfbe9ba4 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java @@ -44,7 +44,14 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { String partitionName = isPartitioned ? "partition1" : null; TableBucketWriteResult tableBucketWriteResult = new TableBucketWriteResult<>( - tablePath, tableBucket, partitionName, testingWriteResult, 10, 30L, 20); + tablePath, + tableBucket, + partitionName, + testingWriteResult, + 10, + 30L, + 20, + false); // test serialize and deserialize byte[] serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); @@ -60,11 +67,12 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { assertThat(deserializedWriteResult.getWriteResult()) .isEqualTo(testingWriteResult.getWriteResult()); assertThat(deserialized.numberOfWriteResults()).isEqualTo(20); + assertThat(deserialized.isCancelled()).isFalse(); // verify when writeResult is null tableBucketWriteResult = new TableBucketWriteResult<>( - tablePath, tableBucket, partitionName, null, 20, 30L, 30); + tablePath, tableBucket, partitionName, null, 20, 30L, 30, false); serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); deserialized = tableBucketWriteResultSerializer.deserialize( @@ -74,5 +82,25 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { assertThat(deserialized.partitionName()).isEqualTo(partitionName); assertThat(deserialized.writeResult()).isNull(); assertThat(deserialized.numberOfWriteResults()).isEqualTo(30); + assertThat(deserialized.isCancelled()).isFalse(); + + // verify when cancelled is true + tableBucketWriteResult = + new TableBucketWriteResult<>( + tablePath, + tableBucket, + partitionName, + testingWriteResult, + 40, + 50L, + 60, + true); + serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); + deserialized = + tableBucketWriteResultSerializer.deserialize( + tableBucketWriteResultSerializer.getVersion(), serialized); + assertThat(deserialized.tablePath()).isEqualTo(tablePath); + assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); + assertThat(deserialized.isCancelled()).isTrue(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java index 9e9de2c792..ddda19053d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TableBucket; @@ -175,6 +176,113 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { } } + @Test + void testHandleTieringTableDroppedEvent() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_tiering_table_dropped"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration conf = new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + conf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + elementsQueue = new FutureCompletingBlockingQueue<>(16); + TestingReaderContext readerContext = new TestingReaderContext(); + try (TieringSourceReader reader = + new TieringSourceReader<>( + elementsQueue, + readerContext, + connection, + new TestingLakeTieringFactory(), + Duration.ofMillis(500))) { + + reader.start(); + + // write some data first + writeRows( + connection, + tablePath, + Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), + true); + + // add a split for the table + TieringLogSplit split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 0), + null, + EARLIEST_OFFSET, + 100L); + reader.addSplits(Collections.singletonList(split)); + + // wait to run one round of tiering to do some tiering + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + blockingQueue = getElementsQueue(reader); + waitUntil( + () -> !blockingQueue.isEmpty(), + Duration.ofSeconds(30), + "Fail to wait element queue is not empty."); + + // send TieringTableDroppedEvent (simulating enumerator's heartbeat detection) + TieringTableDroppedEvent event = new TieringTableDroppedEvent(tableId); + + // Verify TieringTableDroppedEvent accessors for coverage + assertThat(event.getTableId()).isEqualTo(tableId); + assertThat(event).isEqualTo(new TieringTableDroppedEvent(tableId)); + assertThat(event).isNotEqualTo(new TieringTableDroppedEvent(tableId + 1)); + assertThat(event.hashCode()) + .isEqualTo(new TieringTableDroppedEvent(tableId).hashCode()); + assertThat(event.toString()).contains(String.valueOf(tableId)); + + reader.handleSourceEvents(event); + + // actually drop the table so log scanner encounters real + // "unknown table or bucket" error on next fetch + admin.dropTable(tablePath, true).get(); + + // should force to finish with empty results (dropped table discards data) + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> output = + new TestingReaderOutput<>(); + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output.getEmittedRecords().get(0); + // write result should be null since dropped table discards data + assertThat(result.writeResult()).isNull(); + }); + + // add another split with skipCurrentRound to verify it's handled correctly + split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 1), + null, + EARLIEST_OFFSET, + 100L); + split.skipCurrentRound(); + reader.addSplits(Collections.singletonList(split)); + + // should skip tiering for this split + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> + output1 = new TestingReaderOutput<>(); + reader.pollNext(output1); + assertThat(output1.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output1.getEmittedRecords().get(0); + assertThat(result.writeResult()).isNull(); + }); + } + } + } + /** * Get the elementsQueue from TieringSourceReader using reflection. * diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8472b825b0..d9a3614d38 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -29,6 +29,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -76,6 +78,7 @@ public void write(LogRecord record) throws IOException { } } + @Nullable @Override public PaimonWriteResult complete() throws IOException { CommitMessage commitMessage; @@ -84,6 +87,10 @@ public PaimonWriteResult complete() throws IOException { } catch (Exception e) { throw new IOException("Failed to complete Paimon write.", e); } + if (commitMessage == null) { + // No data was written, return null to indicate empty write + return null; + } return new PaimonWriteResult(commitMessage); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java index 413b141898..8f6871bf26 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java @@ -61,8 +61,18 @@ public RecordWriter( public abstract void write(LogRecord record) throws Exception; + /** + * Completes the write process and returns the commit message. + * + * @return the commit message, or null if no data was written (empty write scenario) + */ + @Nullable CommitMessage complete() throws Exception { List commitMessages = tableWrite.prepareCommit(); + if (commitMessages.isEmpty()) { + // No data was written, return null to indicate empty write + return null; + } checkState( commitMessages.size() == 1, "The size of CommitMessage must be 1, but got %s.", From 2e0a90486858337a01c13dcbaac1f821b0f0222f Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Wed, 25 Mar 2026 11:25:06 +0800 Subject: [PATCH 2/6] properly handle TableNotExistException in metadata operations --- .../client/metadata/MetadataUpdater.java | 4 +++ .../client/metadata/MetadataUpdaterTest.java | 31 +++++++++++++++++++ .../tiering/source/TieringSplitReader.java | 3 +- .../server/coordinator/MetadataManager.java | 2 ++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java index e33a78344d..d65df73253 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java @@ -30,6 +30,7 @@ import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.RetriableException; import org.apache.fluss.exception.StaleMetadataException; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePartition; @@ -287,6 +288,9 @@ public void updateMetadata( } else if (t instanceof PartitionNotExistException) { LOG.warn("Failed to update metadata because the partition does not exist", t); throw (PartitionNotExistException) t; + } else if (t instanceof TableNotExistException) { + LOG.warn("Failed to update metadata because the table does not exist", t); + throw (TableNotExistException) t; } else { throw new FlussRuntimeException("Failed to update metadata", t); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index 6717cfe5ae..fb35c3833f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.StaleMetadataException; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -70,6 +71,24 @@ void testInitializeClusterWithRetries() throws Exception { .hasMessageContaining("The metadata is stale."); } + @Test + void testTableNotExistExceptionIsNotWrapped() throws Exception { + Configuration configuration = new Configuration(); + RpcClient rpcClient = + RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false); + + // Gateway that throws TableNotExistException on metadata request + AdminReadOnlyGateway gateway = new TestingTableNotExistGateway(); + + // TableNotExistException should propagate directly without being wrapped + assertThatThrownBy( + () -> + MetadataUpdater.tryToInitializeClusterWithRetries( + rpcClient, CS_NODE, gateway, 3)) + .isInstanceOf(TableNotExistException.class) + .hasMessageContaining("test_table"); + } + private static final class TestingAdminReadOnlyGateway extends TestCoordinatorGateway { private final int maxRetryCount; @@ -95,4 +114,16 @@ public CompletableFuture metadata(MetadataRequest request) { } } } + + /** + * A testing gateway that throws TableNotExistException when metadata is requested. Used to + * verify that TableNotExistException is propagated without being wrapped. + */ + private static final class TestingTableNotExistGateway extends TestCoordinatorGateway { + + @Override + public CompletableFuture metadata(MetadataRequest request) { + throw new TableNotExistException("Table 'test_table' does not exist."); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 4abaa07b8c..715f0a3bd7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -23,6 +23,7 @@ import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; @@ -176,7 +177,7 @@ public RecordsWithSplitIds> fetch() throws I ScanRecords scanRecords; try { scanRecords = currentLogScanner.poll(pollTimeout); - } catch (Exception e) { + } catch (TableNotExistException e) { // When a table is actually dropped, the log scanner's poll may fail // because metadata update discovers the table no longer exists. if (droppedTables.contains(currentTableId)) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index cc2bb5b702..3115eb3011 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -701,6 +701,8 @@ public Map getTables(Collection tablePaths) .getLakeCatalogContainer() .getDefaultTableLakeOptions())); } + } catch (TableNotExistException e) { + throw e; } catch (Exception e) { throw new FlussRuntimeException( String.format("Failed to get tables '%s'.", tablePaths), e); From ca316bee73debf01b948633279d1eda834de0b2c Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Mon, 30 Mar 2026 22:46:54 +0800 Subject: [PATCH 3/6] remove TableNotExistException handling from metadata updater --- .../client/metadata/MetadataUpdater.java | 4 - .../client/metadata/MetadataUpdaterTest.java | 31 ----- .../TableBucketWriteResultSerializer.java | 11 +- .../tiering/source/TieringSplitReader.java | 41 +++--- .../TableBucketWriteResultSerializerTest.java | 127 ++++++++++-------- .../source/TieringSourceReaderTest.java | 116 +++++++++------- 6 files changed, 167 insertions(+), 163 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java index d65df73253..e33a78344d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java @@ -30,7 +30,6 @@ import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.RetriableException; import org.apache.fluss.exception.StaleMetadataException; -import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePartition; @@ -288,9 +287,6 @@ public void updateMetadata( } else if (t instanceof PartitionNotExistException) { LOG.warn("Failed to update metadata because the partition does not exist", t); throw (PartitionNotExistException) t; - } else if (t instanceof TableNotExistException) { - LOG.warn("Failed to update metadata because the table does not exist", t); - throw (TableNotExistException) t; } else { throw new FlussRuntimeException("Failed to update metadata", t); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index fb35c3833f..6717cfe5ae 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -22,7 +22,6 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.StaleMetadataException; -import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -71,24 +70,6 @@ void testInitializeClusterWithRetries() throws Exception { .hasMessageContaining("The metadata is stale."); } - @Test - void testTableNotExistExceptionIsNotWrapped() throws Exception { - Configuration configuration = new Configuration(); - RpcClient rpcClient = - RpcClient.create(configuration, TestingClientMetricGroup.newInstance(), false); - - // Gateway that throws TableNotExistException on metadata request - AdminReadOnlyGateway gateway = new TestingTableNotExistGateway(); - - // TableNotExistException should propagate directly without being wrapped - assertThatThrownBy( - () -> - MetadataUpdater.tryToInitializeClusterWithRetries( - rpcClient, CS_NODE, gateway, 3)) - .isInstanceOf(TableNotExistException.class) - .hasMessageContaining("test_table"); - } - private static final class TestingAdminReadOnlyGateway extends TestCoordinatorGateway { private final int maxRetryCount; @@ -114,16 +95,4 @@ public CompletableFuture metadata(MetadataRequest request) { } } } - - /** - * A testing gateway that throws TableNotExistException when metadata is requested. Used to - * verify that TableNotExistException is propagated without being wrapped. - */ - private static final class TestingTableNotExistGateway extends TestCoordinatorGateway { - - @Override - public CompletableFuture metadata(MetadataRequest request) { - throw new TableNotExistException("Table 'test_table' does not exist."); - } - } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 34a6d41c0d..65d7b7927c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -33,7 +33,10 @@ public class TableBucketWriteResultSerializer private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; + + // Version 1: original format without the cancelled flag + private static final int VERSION_1 = 1; private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer writeResultSerializer; @@ -102,7 +105,7 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu @Override public TableBucketWriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { + if (version != CURRENT_VERSION && version != VERSION_1) { throw new IOException("Unknown version " + version); } final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -140,8 +143,8 @@ public TableBucketWriteResult deserialize(int version, byte[] seria // deserialize number of write results int numberOfWriteResults = in.readInt(); - // deserialize cancelled flag - boolean cancelled = in.readBoolean(); + // deserialize cancelled flag (added in version 2; default to false for version 1) + boolean cancelled = (version >= CURRENT_VERSION) && in.readBoolean(); return new TableBucketWriteResult<>( tablePath, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 715f0a3bd7..ad392e9a86 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -23,7 +23,6 @@ import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.ScanRecords; -import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; @@ -174,26 +173,7 @@ public RecordsWithSplitIds> fetch() throws I if (reachTieringMaxDurationTables.contains(currentTableId)) { return forceCompleteTieringLogRecords(); } - ScanRecords scanRecords; - try { - scanRecords = currentLogScanner.poll(pollTimeout); - } catch (TableNotExistException e) { - // When a table is actually dropped, the log scanner's poll may fail - // because metadata update discovers the table no longer exists. - if (droppedTables.contains(currentTableId)) { - LOG.warn( - "Log scanner poll failed for dropped table {}, force completing.", - currentTableId, - e); - return forceCompleteDroppedTable(); - } - // The table may have been dropped but TieringTableDroppedEvent - // hasn't been processed yet. Return empty to let the fetcher - // retry on next iteration when the event will be processed. - LOG.warn( - "Log scanner poll failed for table {}, will retry.", currentTableId, e); - return emptyTableBucketWriteResultWithSplitIds(); - } + ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); return forLogRecords(scanRecords); } else { return emptyTableBucketWriteResultWithSplitIds(); @@ -280,6 +260,25 @@ private void checkSplitOrStartNext() { } Set pendingSplits = pendingTieringSplits.remove(pendingTableId); + + // If the pending table is already dropped, set minimal table state from split metadata + // without calling getOrMoveToTable() to avoid RPC exception (TableNotExistException). + // The next fetch() cycle will detect the dropped flag and call forceCompleteDroppedTable(). + if (droppedTables.contains(pendingTableId)) { + TieringSplit firstSplit = pendingSplits.iterator().next(); + currentTableId = pendingTableId; + currentTablePath = firstSplit.getTablePath(); + currentTableNumberOfSplits = firstSplit.getNumberOfSplits(); + for (TieringSplit split : pendingSplits) { + currentTableSplitsByBucket.put(split.getTableBucket(), split); + } + LOG.info( + "Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.", + pendingTableId, + currentTablePath); + return; + } + for (TieringSplit split : pendingSplits) { getOrMoveToTable(split); addSplitToCurrentTable(split); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java index 9acfbe9ba4..4521eff339 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java @@ -21,6 +21,8 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -42,65 +44,82 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { TableBucket tableBucket = isPartitioned ? new TableBucket(1, 1000L, 2) : new TableBucket(1, 2); String partitionName = isPartitioned ? "partition1" : null; - TableBucketWriteResult tableBucketWriteResult = - new TableBucketWriteResult<>( - tablePath, - tableBucket, - partitionName, - testingWriteResult, - 10, - 30L, - 20, - false); - // test serialize and deserialize - byte[] serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); - TableBucketWriteResult deserialized = - tableBucketWriteResultSerializer.deserialize( - tableBucketWriteResultSerializer.getVersion(), serialized); - - assertThat(deserialized.tablePath()).isEqualTo(tablePath); - assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); - assertThat(deserialized.partitionName()).isEqualTo(partitionName); - TestingWriteResult deserializedWriteResult = deserialized.writeResult(); - assertThat(deserializedWriteResult).isNotNull(); - assertThat(deserializedWriteResult.getWriteResult()) + TableBucketWriteResult result = + serializeAndDeserialize( + new TableBucketWriteResult<>( + tablePath, + tableBucket, + partitionName, + testingWriteResult, + 10, + 30L, + 20, + false)); + assertThat(result.tablePath()).isEqualTo(tablePath); + assertThat(result.tableBucket()).isEqualTo(tableBucket); + assertThat(result.partitionName()).isEqualTo(partitionName); + assertThat(result.writeResult().getWriteResult()) .isEqualTo(testingWriteResult.getWriteResult()); - assertThat(deserialized.numberOfWriteResults()).isEqualTo(20); - assertThat(deserialized.isCancelled()).isFalse(); + assertThat(result.isCancelled()).isFalse(); - // verify when writeResult is null - tableBucketWriteResult = - new TableBucketWriteResult<>( - tablePath, tableBucket, partitionName, null, 20, 30L, 30, false); - serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); - deserialized = - tableBucketWriteResultSerializer.deserialize( - tableBucketWriteResultSerializer.getVersion(), serialized); - assertThat(deserialized.tablePath()).isEqualTo(tablePath); - assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); - assertThat(deserialized.partitionName()).isEqualTo(partitionName); - assertThat(deserialized.writeResult()).isNull(); - assertThat(deserialized.numberOfWriteResults()).isEqualTo(30); - assertThat(deserialized.isCancelled()).isFalse(); + // verify when writeResult is null, cancelled=true + result = + serializeAndDeserialize( + new TableBucketWriteResult<>( + tablePath, tableBucket, partitionName, null, 20, 30L, 30, true)); + assertThat(result.writeResult()).isNull(); + assertThat(result.isCancelled()).isTrue(); + } + + private TableBucketWriteResult serializeAndDeserialize( + TableBucketWriteResult input) throws Exception { + byte[] serialized = tableBucketWriteResultSerializer.serialize(input); + return tableBucketWriteResultSerializer.deserialize( + tableBucketWriteResultSerializer.getVersion(), serialized); + } + + @Test + void testDeserializeVersion1IsBackwardCompatible() throws Exception { + // Manually construct a version-1 payload (no cancelled flag) and verify + // that it deserializes correctly with cancelled defaulting to false. + TablePath tablePath = TablePath.of("db1", "tb1"); + TableBucket tableBucket = new TableBucket(1, 2); + TestingWriteResult testingWriteResult = new TestingWriteResult(42); + TestingWriteResultSerializer writeResultSerializer = new TestingWriteResultSerializer(); + + DataOutputSerializer out = new DataOutputSerializer(64); + // table path + out.writeUTF(tablePath.getDatabaseName()); + out.writeUTF(tablePath.getTableName()); + // bucket (no partition) + out.writeLong(tableBucket.getTableId()); + out.writeBoolean(false); + out.writeInt(tableBucket.getBucket()); + // write result + byte[] writeResultBytes = writeResultSerializer.serialize(testingWriteResult); + out.writeInt(writeResultBytes.length); + out.write(writeResultBytes); + // log end offset + out.writeLong(100L); + // max timestamp + out.writeLong(200L); + // number of write results + out.writeInt(3); + // NOTE: no cancelled flag — this is a version-1 payload + byte[] v1Bytes = out.getCopyOfBuffer(); + + TableBucketWriteResult deserialized = + tableBucketWriteResultSerializer.deserialize(1, v1Bytes); - // verify when cancelled is true - tableBucketWriteResult = - new TableBucketWriteResult<>( - tablePath, - tableBucket, - partitionName, - testingWriteResult, - 40, - 50L, - 60, - true); - serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); - deserialized = - tableBucketWriteResultSerializer.deserialize( - tableBucketWriteResultSerializer.getVersion(), serialized); assertThat(deserialized.tablePath()).isEqualTo(tablePath); assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); - assertThat(deserialized.isCancelled()).isTrue(); + assertThat(deserialized.writeResult()).isNotNull(); + assertThat(deserialized.writeResult().getWriteResult()).isEqualTo(42); + assertThat(deserialized.logEndOffset()).isEqualTo(100L); + assertThat(deserialized.maxTimestamp()).isEqualTo(200L); + assertThat(deserialized.numberOfWriteResults()).isEqualTo(3); + // cancelled must default to false when deserializing a version-1 payload + assertThat(deserialized.isCancelled()).isFalse(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java index ddda19053d..bcaa10edbd 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; @@ -177,14 +178,26 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { } @Test - void testHandleTieringTableDroppedEvent() throws Exception { - TablePath tablePath = TablePath.of("fluss", "test_tiering_table_dropped"); - long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + void testHandlePendingTableDroppedBeforeFetch() throws Exception { + // Create two tables: one as "current" table and one as "pending" table + TablePath currentTablePath = TablePath.of("fluss", "test_current_table"); + long currentTableId = createTable(currentTablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + + TablePath pendingTablePath = TablePath.of("fluss", "test_pending_table_dropped"); + long pendingTableId = createTable(pendingTablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration conf = new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig()); conf.set( ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, ConfigOptions.NoKeyAssigner.ROUND_ROBIN); try (Connection connection = ConnectionFactory.createConnection(conf)) { + // Write some data to current table + writeRows( + connection, + currentTablePath, + Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), + true); + FutureCompletingBlockingQueue< RecordsWithSplitIds>> elementsQueue = new FutureCompletingBlockingQueue<>(16); @@ -199,24 +212,17 @@ void testHandleTieringTableDroppedEvent() throws Exception { reader.start(); - // write some data first - writeRows( - connection, - tablePath, - Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), - true); - - // add a split for the table - TieringLogSplit split = + // Add split for current table first - it will become the active table + TieringLogSplit currentSplit = new TieringLogSplit( - tablePath, - new TableBucket(tableId, 0), + currentTablePath, + new TableBucket(currentTableId, 0), null, EARLIEST_OFFSET, 100L); - reader.addSplits(Collections.singletonList(split)); + reader.addSplits(Collections.singletonList(currentSplit)); - // wait to run one round of tiering to do some tiering + // Wait for the current table to start tiering FutureCompletingBlockingQueue< RecordsWithSplitIds>> blockingQueue = getElementsQueue(reader); @@ -225,24 +231,38 @@ void testHandleTieringTableDroppedEvent() throws Exception { Duration.ofSeconds(30), "Fail to wait element queue is not empty."); - // send TieringTableDroppedEvent (simulating enumerator's heartbeat detection) - TieringTableDroppedEvent event = new TieringTableDroppedEvent(tableId); - - // Verify TieringTableDroppedEvent accessors for coverage - assertThat(event.getTableId()).isEqualTo(tableId); - assertThat(event).isEqualTo(new TieringTableDroppedEvent(tableId)); - assertThat(event).isNotEqualTo(new TieringTableDroppedEvent(tableId + 1)); - assertThat(event.hashCode()) - .isEqualTo(new TieringTableDroppedEvent(tableId).hashCode()); - assertThat(event.toString()).contains(String.valueOf(tableId)); + // Now add split for pending table - it will go into pendingTieringSplits + TieringLogSplit pendingSplit = + new TieringLogSplit( + pendingTablePath, + new TableBucket(pendingTableId, 0), + null, + EARLIEST_OFFSET, + 100); + reader.addSplits(Collections.singletonList(pendingSplit)); + // Mark the pending table as dropped BEFORE it becomes the active table + TieringTableDroppedEvent event = new TieringTableDroppedEvent(pendingTableId); reader.handleSourceEvents(event); - // actually drop the table so log scanner encounters real - // "unknown table or bucket" error on next fetch - admin.dropTable(tablePath, true).get(); + // Use an independent Connection to drop the table, avoiding + // invalidation of the reader's metadata cache. + // This simulates the real scenario: an external client drops + // the table, and the reader discovers it via fetch response + // error code (UNKNOWN_TABLE_OR_BUCKET). + try (Connection dropConnection = + ConnectionFactory.createConnection( + FLUSS_CLUSTER_EXTENSION.getClientConfig()); + Admin dropAdmin = dropConnection.getAdmin()) { + dropAdmin.dropTable(pendingTablePath, true).get(); + } + + // Force complete the current table so the pending table becomes active + TieringReachMaxDurationEvent maxDurationEvent = + new TieringReachMaxDurationEvent(currentTableId); + reader.handleSourceEvents(maxDurationEvent); - // should force to finish with empty results (dropped table discards data) + // First, complete the current table retry( Duration.ofMinutes(1), () -> { @@ -252,32 +272,30 @@ void testHandleTieringTableDroppedEvent() throws Exception { assertThat(output.getEmittedRecords()).hasSize(1); TableBucketWriteResult result = output.getEmittedRecords().get(0); - // write result should be null since dropped table discards data - assertThat(result.writeResult()).isNull(); + // This should be the current table's result + assertThat(result.tableBucket().getTableId()).isEqualTo(currentTableId); }); - // add another split with skipCurrentRound to verify it's handled correctly - split = - new TieringLogSplit( - tablePath, - new TableBucket(tableId, 1), - null, - EARLIEST_OFFSET, - 100L); - split.skipCurrentRound(); - reader.addSplits(Collections.singletonList(split)); - - // should skip tiering for this split + // Now the pending (dropped) table should become active and complete + // without throwing RPC exception retry( Duration.ofMinutes(1), () -> { - TestingReaderOutput> - output1 = new TestingReaderOutput<>(); - reader.pollNext(output1); - assertThat(output1.getEmittedRecords()).hasSize(1); + TestingReaderOutput> output = + new TestingReaderOutput<>(); + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); TableBucketWriteResult result = - output1.getEmittedRecords().get(0); + output.getEmittedRecords().get(0); + // This should be the pending table's result + assertThat(result.tableBucket().getTableId()).isEqualTo(pendingTableId); + // write result should be null since dropped table discards data assertThat(result.writeResult()).isNull(); + // offset and timestamp should be UNKNOWN + assertThat(result.logEndOffset()).isEqualTo(-1L); + assertThat(result.maxTimestamp()).isEqualTo(-1L); + // should be marked as cancelled + assertThat(result.isCancelled()).isTrue(); }); } } From 1c15b83e0628671e88a4f3a8aca8b09cf97aa00e Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 31 Mar 2026 10:01:02 +0800 Subject: [PATCH 4/6] correct version usage in serializer and simplify table drop logic --- .../source/TableBucketWriteResultSerializer.java | 4 +++- .../tiering/source/TieringSourceReaderTest.java | 13 +------------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 65d7b7927c..31fb0e94a3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -131,7 +131,9 @@ public TableBucketWriteResult deserialize(int version, byte[] seria if (writeResultLength >= 0) { byte[] writeResultBytes = new byte[writeResultLength]; in.readFully(writeResultBytes); - writeResult = writeResultSerializer.deserialize(version, writeResultBytes); + writeResult = + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), writeResultBytes); } else { writeResult = null; } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java index bcaa10edbd..f2583e4562 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -20,7 +20,6 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; -import org.apache.fluss.client.admin.Admin; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; @@ -245,17 +244,7 @@ void testHandlePendingTableDroppedBeforeFetch() throws Exception { TieringTableDroppedEvent event = new TieringTableDroppedEvent(pendingTableId); reader.handleSourceEvents(event); - // Use an independent Connection to drop the table, avoiding - // invalidation of the reader's metadata cache. - // This simulates the real scenario: an external client drops - // the table, and the reader discovers it via fetch response - // error code (UNKNOWN_TABLE_OR_BUCKET). - try (Connection dropConnection = - ConnectionFactory.createConnection( - FLUSS_CLUSTER_EXTENSION.getClientConfig()); - Admin dropAdmin = dropConnection.getAdmin()) { - dropAdmin.dropTable(pendingTablePath, true).get(); - } + connection.getAdmin().dropTable(pendingTablePath, true).get(); // Force complete the current table so the pending table becomes active TieringReachMaxDurationEvent maxDurationEvent = From 011aa0d839a1d9dda4ef3478bb8e29e0ece015e6 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 31 Mar 2026 10:05:58 +0800 Subject: [PATCH 5/6] simplify exception handling in getTables method - Removed redundant TableNotExistException catch block that re-threw the same exception - Consolidated exception handling to reduce code duplication - Maintained same error propagation behavior for all other exceptions - Improved code readability by removing unnecessary exception wrapping --- .../org/apache/fluss/server/coordinator/MetadataManager.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 3115eb3011..cc2bb5b702 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -701,8 +701,6 @@ public Map getTables(Collection tablePaths) .getLakeCatalogContainer() .getDefaultTableLakeOptions())); } - } catch (TableNotExistException e) { - throw e; } catch (Exception e) { throw new FlussRuntimeException( String.format("Failed to get tables '%s'.", tablePaths), e); From ef52bd94ec8dc67fa4267dcf95858295794c0aed Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 31 Mar 2026 14:42:20 +0800 Subject: [PATCH 6/6] add test for table dropped handling in source enumerator --- .../enumerator/TieringSourceEnumerator.java | 3 +- .../TieringSourceEnumeratorTest.java | 47 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 05ed379694..cf59e917b6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -476,7 +476,8 @@ private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse heart * Handle a dropped table by marking all related splits to skip, removing from tiering epochs, * and notifying readers. */ - private void handleTableDropped(long tableId) { + @VisibleForTesting + protected void handleTableDropped(long tableId) { // Remove from tiering table epochs Long tieringEpoch = tieringTableEpochs.remove(tableId); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index b725f99e7e..17a9c2c612 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; @@ -838,4 +839,50 @@ void testTableReachMaxTieringDuration() throws Throwable { && !split.shouldSkipCurrentRound()); } } + + @Test + void testHandleTableDropped() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-dropped-table-test"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + int numSubtasks = 2; + + appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks); + TieringSourceEnumerator enumerator = + createTieringSourceEnumerator(flussConf, context)) { + enumerator.start(); + + // Register all readers + for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { + context.registerSourceReader(subtaskId, subtaskId, "localhost-" + subtaskId); + } + + for (int subTask = 0; subTask < numSubtasks; subTask++) { + enumerator.handleSplitRequest(subTask, "localhost-" + subTask); + } + + // Wait for initial assignment - this registers the table in tieringTableEpochs + // Use numSubtasks (not DEFAULT_BUCKET_NUM) since only numSubtasks readers + // request splits, so at most numSubtasks assignments can be made + waitUntilTieringTableSplitAssignmentReady(context, numSubtasks, 200L); + + // Drop the table while tiering is in progress + conn.getAdmin().dropTable(tablePath, true).get(); + + // Directly call handleTableDropped to simulate detection of table drop + // This is similar to how testTableReachMaxTieringDuration directly triggers + // handleTableTieringReachMaxDuration via timer + enumerator.handleTableDropped(tableId); + + // Verify that TieringTableDroppedEvent was sent to all readers + // The containsExactly assertion naturally triggers equals/hashCode coverage + Map> eventsToReaders = context.getSentSourceEvent(); + assertThat(eventsToReaders).hasSize(numSubtasks); + for (Map.Entry> entry : eventsToReaders.entrySet()) { + assertThat(entry.getValue()).contains(new TieringTableDroppedEvent(tableId)); + } + } + } }