From 2db03de1966d36b75e9bf422908304074d21eabd Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 10:41:37 +0800 Subject: [PATCH 1/7] [flink] Support kv snapshot lease in tiering service (#2898) TieringSourceEnumerator now acquires a KV snapshot lease for all TieringSnapshotSplits before they are assigned to readers, and releases the lease when the table finishes or fails tiering, or when a reader failover returns the splits. A best-effort dropLease is also performed on enumerator close. This prevents the Fluss server from cleaning up snapshots that the tiering job still depends on. One lease id per tiering job (UUID-based) is reused across tables and persisted into TieringSourceEnumeratorState so that it survives enumerator restore instead of leaking orphan leases. The lease uses a fixed 1-day duration that is implicitly renewed by every acquireSnapshots call, and UnsupportedVersionException from older Fluss servers is downgraded to a warning to keep backward compatibility. --- .../flink/tiering/source/TieringSource.java | 8 +- .../enumerator/TieringSourceEnumerator.java | 226 +++++++++++++++++- .../state/TieringSourceEnumeratorState.java | 48 +++- ...ieringSourceEnumeratorStateSerializer.java | 57 ++++- .../TieringSourceEnumeratorTest.java | 120 ++++++++++ ...ngSourceEnumeratorStateSerializerTest.java | 32 ++- 6 files changed, 469 insertions(+), 22 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index 649b758704..0a3b1d0964 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -91,9 +91,13 @@ public SplitEnumerator createEnumera public SplitEnumerator restoreEnumerator( SplitEnumeratorContext splitEnumeratorContext, TieringSourceEnumeratorState tieringSourceEnumeratorState) { - // stateless operator + // Restore with the lease id from the checkpoint so that the enumerator reuses + // the same lease instead of leaking orphaned leases. return new TieringSourceEnumerator( - flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); + flussConf, + splitEnumeratorContext, + pollTieringTableIntervalMs, + tieringSourceEnumeratorState.getKvSnapshotLeaseId()); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 89a10ee898..34104f670f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -23,14 +23,17 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.UnsupportedVersionException; import org.apache.fluss.flink.metrics.FlinkMetricRegistry; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; import org.apache.fluss.lake.committer.TieringStats; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.GatewayClientProxy; @@ -42,6 +45,7 @@ import org.apache.fluss.rpc.messages.PbLakeTieringStats; import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.SourceEvent; @@ -56,6 +60,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,6 +69,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -96,6 +102,19 @@ public class TieringSourceEnumerator private static final Logger LOG = LoggerFactory.getLogger(TieringSourceEnumerator.class); + /** + * KV snapshot lease duration for the whole tiering job. One lease covers the entire job + * lifecycle; it is renewed implicitly by every {@code acquireSnapshots} call, so a relatively + * long duration is safe and also bounds the worst-case leaked-lease lifetime if the job dies + * abnormally. + * + *

TODO: introduce an explicit periodic lease-renewal mechanism so that a single tiering + * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for very large tables) will + * not see its snapshots garbage-collected mid-flight. Tracked as a follow-up issue; tiering + * rounds are typically minute-level today so a 1-day lease is sufficient in practice. + */ + private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofDays(1).toMillis(); + private final Configuration flussConf; private final SplitEnumeratorContext context; private final ScheduledExecutorService timerService; @@ -109,6 +128,18 @@ public class TieringSourceEnumerator private final Map finishedTables; private final Set tieringReachMaxDurationsTables; + /** + * Buckets whose kv snapshots are currently held under the lease, grouped by tableId. Used to + * release the correct bucket subset when a table finishes/fails or when a failover happens. + */ + private final Map> leasedBucketsByTable; + + /** + * A unique lease id for this tiering job. Reused across all tables to keep a single renewal and + * bookkeeping entry on the server side, aligned with the normal Flink Source design. + */ + private final String kvSnapshotLeaseId; + // lazily instantiated private RpcClient rpcClient; private CoordinatorGateway coordinatorGateway; @@ -125,6 +156,19 @@ public TieringSourceEnumerator( Configuration flussConf, SplitEnumeratorContext context, long pollTieringTableIntervalMs) { + this(flussConf, context, pollTieringTableIntervalMs, null); + } + + /** + * Creates a new enumerator, optionally restoring from a previously persisted lease id. + * + * @param restoredLeaseId the lease id from a previous checkpoint, or null for fresh start + */ + public TieringSourceEnumerator( + Configuration flussConf, + SplitEnumeratorContext context, + long pollTieringTableIntervalMs, + @Nullable String restoredLeaseId) { this.flussConf = flussConf; this.context = context; this.timerService = @@ -138,6 +182,11 @@ public TieringSourceEnumerator( this.finishedTables = new ConcurrentHashMap<>(); this.failedTableEpochs = new ConcurrentHashMap<>(); this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>()); + // Thread safety: outer map is ConcurrentHashMap, values are ConcurrentHashMap-backed + // Sets. Reads/writes are safe across the coordinator thread and the timer thread. + this.leasedBucketsByTable = new ConcurrentHashMap<>(); + this.kvSnapshotLeaseId = + restoredLeaseId != null ? restoredLeaseId : "tiering-" + UUID.randomUUID(); } @Override @@ -279,6 +328,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { TieringFinishInfo.from( tieringEpoch, isForceFinished, finishedTieringEvent.getStats())); } + // release the kv snapshot lease held for this table (if any). + maybeReleaseKvSnapshotLease(finishedTableId); } if (sourceEvent instanceof FailedTieringEvent) { @@ -297,6 +348,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } else { failedTableEpochs.put(failedTableId, tieringEpoch); } + // release the kv snapshot lease held for this table (if any). + maybeReleaseKvSnapshotLease(failedTableId); } if (!finishedTables.isEmpty() || !failedTableEpochs.isEmpty()) { @@ -313,10 +366,18 @@ private void handleSourceReaderFailOver() { tieringTableEpochs); // we need to make all as failed failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs)); + // release all currently leased buckets for tables that are being marked failed; + // take a snapshot of the keys first to avoid concurrent modification. + Set tableIdsToRelease = new HashSet<>(tieringTableEpochs.keySet()); tieringTableEpochs.clear(); tieringReachMaxDurationsTables.clear(); // also clean all pending splits since we mark all as failed pendingSplits.clear(); + // Release leases asynchronously to avoid blocking the coordinator thread when + // multiple tables are involved and RPC calls may time out. + for (Long tableId : tableIdsToRelease) { + maybeReleaseKvSnapshotLeaseAsync(tableId); + } if (!failedTableEpochs.isEmpty()) { // call one round of heartbeat to notify table has been finished or failed this.context.callAsync( @@ -464,6 +525,10 @@ private void generateTieringSplits(Tuple3 tieringTable) finishedTables.put(tieringTable.f0, TieringFinishInfo.from(tieringTable.f1)); } else { tieringTableEpochs.put(tieringTable.f0, tieringTable.f1); + // Acquire kv snapshot lease for all snapshot splits of this table before they + // are assigned to readers, so that snapshots referenced by these splits will not + // be garbage-collected by the Fluss server while tiering is in progress. + maybeAcquireKvSnapshotLease(tieringTable.f0, tieringSplits); pendingSplits.addAll(tieringSplits); timerService.schedule( @@ -481,7 +546,12 @@ private void generateTieringSplits(Tuple3 tieringTable) } } catch (Exception e) { LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e); + // Remove from tieringTableEpochs in case it was already added before the failure. + tieringTableEpochs.remove(tieringTable.f0); failedTableEpochs.put(tieringTable.f0, tieringTable.f1); + // Release any lease that was partially acquired before the failure so the + // server-side snapshot references are not held unnecessarily until lease expiry. + maybeReleaseKvSnapshotLease(tieringTable.f0); } } @@ -494,8 +564,9 @@ private List populateNumberOfTieringSplits(List tier @Override public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exception { - // do nothing, the downstream lake committer will snapshot the state to Fluss Cluster - return new TieringSourceEnumeratorState(); + // Persist the lease id so that on restore we can reuse the same id instead of leaking + // an orphaned lease on the server. + return new TieringSourceEnumeratorState(kvSnapshotLeaseId); } @Override @@ -515,6 +586,18 @@ public void close() throws IOException { LOG.error("Failed to close Tiering Source enumerator.", e); } } + // NOTE: we intentionally do NOT drop the kv snapshot lease here. The lease id is + // persisted into the enumerator checkpoint state and will be reused by the restored + // enumerator after a JM failover. Dropping it on close would destroy the lease that + // the restored enumerator expects to reuse, potentially causing the referenced + // snapshots to be garbage-collected before tiering finishes. The lease will expire + // naturally on the server side (see KV_SNAPSHOT_LEASE_DURATION_MS). + // + // TODO: if the job is cancelled by the user (rather than restarted), the lease will only + // be reclaimed on the server side when it expires (up to KV_SNAPSHOT_LEASE_DURATION_MS). + // We cannot currently distinguish "user cancel" from "failover" at the SplitEnumerator + // layer; consider wiring a cancel hook (or using Flink's close(reason) when available) + // so that user-initiated cancellations can drop the lease eagerly. try { if (flussAdmin != null) { LOG.info("Closing Fluss Admin client..."); @@ -533,6 +616,145 @@ public void close() throws IOException { } } + /** + * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the given table so that + * snapshots referenced by these splits will not be cleaned up by the Fluss server during + * tiering. Bucket-snapshot mappings are remembered in {@link #leasedBucketsByTable} so they can + * be released on finish/fail. + * + *

Falls back to a warning (no exception thrown) when the server does not support the kv + * snapshot lease API, to preserve compatibility with older Fluss clusters. + */ + private void maybeAcquireKvSnapshotLease(long tableId, List tieringSplits) { + if (flussAdmin == null) { + return; + } + Map bucketsToLease = new HashMap<>(); + for (TieringSplit split : tieringSplits) { + if (split.isTieringSnapshotSplit()) { + TieringSnapshotSplit snapshotSplit = split.asTieringSnapshotSplit(); + bucketsToLease.put(snapshotSplit.getTableBucket(), snapshotSplit.getSnapshotId()); + } + } + if (bucketsToLease.isEmpty()) { + return; + } + LOG.info( + "Try to acquire kv snapshot lease {} for tiering table {} with {} buckets.", + kvSnapshotLeaseId, + tableId, + bucketsToLease.size()); + try { + flussAdmin + .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) + .acquireSnapshots(bucketsToLease) + .get(); + leasedBucketsByTable + .computeIfAbsent(tableId, k -> ConcurrentHashMap.newKeySet()) + .addAll(bucketsToLease.keySet()); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) { + LOG.warn( + "Failed to acquire kv snapshot lease for tiering table {} because the " + + "server does not support kv snapshot lease API. Snapshots may " + + "be cleaned up earlier than expected. Please upgrade the Fluss " + + "server to version 0.9 or later.", + tableId, + e); + } else { + LOG.error( + "Failed to acquire kv snapshot lease for tiering table {}. " + + "Tiering will proceed without snapshot protection; the " + + "snapshot may be garbage-collected while tiering is in progress.", + tableId, + e); + } + } + } + + /** + * Release the kv snapshot lease held for a specific table. Called when a table finishes + * tiering, fails, or is abandoned due to failover. Missing leases (log-only tables, or tables + * for which acquire failed) are handled as no-ops. + */ + private void maybeReleaseKvSnapshotLease(long tableId) { + Set buckets = leasedBucketsByTable.remove(tableId); + if (flussAdmin == null || buckets == null || buckets.isEmpty()) { + return; + } + LOG.info( + "Try to release kv snapshot lease {} for tiering table {} with {} buckets.", + kvSnapshotLeaseId, + tableId, + buckets.size()); + try { + flussAdmin + .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) + .releaseSnapshots(buckets) + .get(); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) { + LOG.warn( + "Failed to release kv snapshot lease for tiering table {} because the " + + "server does not support kv snapshot lease API.", + tableId, + e); + } else { + LOG.error("Failed to release kv snapshot lease for tiering table {}.", tableId, e); + } + } + } + + /** + * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used during failover to + * avoid blocking the coordinator thread when multiple tables need to be released. + */ + private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { + Set buckets = leasedBucketsByTable.remove(tableId); + if (flussAdmin == null || buckets == null || buckets.isEmpty()) { + return; + } + LOG.info( + "Asynchronously releasing kv snapshot lease {} for tiering table {} with {} buckets.", + kvSnapshotLeaseId, + tableId, + buckets.size()); + flussAdmin + .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) + .releaseSnapshots(buckets) + .whenComplete( + (ignored, e) -> { + if (e != null) { + if (ExceptionUtils.findThrowable( + e, UnsupportedVersionException.class) + .isPresent()) { + LOG.warn( + "Failed to release kv snapshot lease for tiering " + + "table {} because the server does not " + + "support kv snapshot lease API.", + tableId, + e); + } else { + LOG.error( + "Failed to release kv snapshot lease for tiering " + + "table {} during failover.", + tableId, + e); + } + } + }); + } + + @VisibleForTesting + String getKvSnapshotLeaseId() { + return kvSnapshotLeaseId; + } + + @VisibleForTesting + Map> getLeasedBucketsByTable() { + return leasedBucketsByTable; + } + /** * Report failed table to Fluss coordinator via HeartBeat, this method should be called when * {@link TieringSourceEnumerator} is closed or receives failed table from downstream lake diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java index 6e7bceefaf..0f40d9bfbc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java @@ -19,23 +19,55 @@ import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator; -/** The marker class of stateless component {@link TieringSourceEnumerator}. */ +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot lease id so that it can + * be recovered after a checkpoint restore, avoiding orphaned leases on the server side. + */ public class TieringSourceEnumeratorState { - public TieringSourceEnumeratorState() {} + + /** + * The KV snapshot lease id used by this tiering job. May be null for jobs that were + * checkpointed before lease support was introduced. + */ + @Nullable private final String kvSnapshotLeaseId; + + /** Creates a state with no lease id (backward compatible with older checkpoints). */ + public TieringSourceEnumeratorState() { + this(null); + } + + public TieringSourceEnumeratorState(@Nullable String kvSnapshotLeaseId) { + this.kvSnapshotLeaseId = kvSnapshotLeaseId; + } + + @Nullable + public String getKvSnapshotLeaseId() { + return kvSnapshotLeaseId; + } @Override - public boolean equals(Object that) { - if (this == that) { + public boolean equals(Object o) { + if (this == o) { return true; } - if (that != null) { - return this.toString().equals(that.toString()); + if (o == null || getClass() != o.getClass()) { + return false; } - return false; + TieringSourceEnumeratorState that = (TieringSourceEnumeratorState) o; + return Objects.equals(kvSnapshotLeaseId, that.kvSnapshotLeaseId); + } + + @Override + public int hashCode() { + return Objects.hash(kvSnapshotLeaseId); } @Override public String toString() { - return "SourceEnumeratorState{}"; + return "TieringSourceEnumeratorState{" + "kvSnapshotLeaseId='" + kvSnapshotLeaseId + "'}"; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java index cff0fea4f8..b573982daa 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java @@ -18,18 +18,34 @@ package org.apache.fluss.flink.tiering.source.state; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; -/** Serializer for {@link TieringSourceEnumeratorState}. */ +/** + * Serializer for {@link TieringSourceEnumeratorState}. + * + *

Version Evolution:

+ * + *
    + *
  • Version 0: Empty state (stateless marker). + *
  • Version 1 (Current): Stores the KV snapshot lease id so that it can be recovered + * after a checkpoint restore. + *
+ */ public class TieringSourceEnumeratorStateSerializer implements SimpleVersionedSerializer { public static final TieringSourceEnumeratorStateSerializer INSTANCE = new TieringSourceEnumeratorStateSerializer(); - private static final int VERSION = 0; - private static final int CURRENT_VERSION = VERSION; + private static final int VERSION_0 = 0; + private static final int VERSION_1 = 1; + private static final int CURRENT_VERSION = VERSION_1; + + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); @Override public int getVersion() { @@ -38,14 +54,41 @@ public int getVersion() { @Override public byte[] serialize(TieringSourceEnumeratorState obj) throws IOException { - // no need to store anything - return new byte[0]; + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + String leaseId = obj.getKvSnapshotLeaseId(); + if (leaseId != null) { + out.writeBoolean(true); + out.writeUTF(leaseId); + } else { + out.writeBoolean(false); + } + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; } @Override public TieringSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException { - // new state - return new TieringSourceEnumeratorState(); + switch (version) { + case VERSION_0: + // v0 was an empty state; no lease id persisted + return new TieringSourceEnumeratorState(); + case VERSION_1: + return deserializeV1(serialized); + default: + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version up to %d", + version, CURRENT_VERSION)); + } + } + + private TieringSourceEnumeratorState deserializeV1(byte[] serialized) throws IOException { + DataInputDeserializer in = new DataInputDeserializer(serialized); + boolean hasLeaseId = in.readBoolean(); + String leaseId = hasLeaseId ? in.readUTF() : null; + return new TieringSourceEnumeratorState(leaseId); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index b725f99e7e..3c8b7b4546 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -185,6 +185,12 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { assertThat(actualAssignment) .containsExactlyInAnyOrderElementsOf(expectedSnapshotAssignment); + // Lease should have been acquired for this table's snapshot buckets after the + // snapshot splits are generated and assigned. + assertThat(enumerator.getLeasedBucketsByTable()).containsOnlyKeys(tableId); + assertThat(enumerator.getLeasedBucketsByTable().get(tableId)) + .hasSize(DEFAULT_BUCKET_NUM); + // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); final Map initialBucketOffsets = new HashMap<>(); @@ -200,6 +206,9 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId)); + // Once the table is finished, the lease held for its buckets should be released. + assertThat(enumerator.getLeasedBucketsByTable()).doesNotContainKey(tableId); + Map bucketOffsetOfSecondWrite = upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 10, 20); triggerAndWaitSnapshot(tableId); @@ -730,6 +739,117 @@ private TieringSourceEnumerator createTieringSourceEnumerator( return new TieringSourceEnumerator(flussConf, context, 500); } + @Test + void testLeaseIdRestoredFromState() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-restore-test"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = 3; + + upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10); + triggerAndWaitSnapshot(tableId); + + String capturedLeaseId; + // First enumerator: create and capture the lease id + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks)) { + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); + enumerator.start(); + capturedLeaseId = enumerator.getKvSnapshotLeaseId(); + assertThat(capturedLeaseId).startsWith("tiering-"); + } + + // Second enumerator: restore from state with same lease id + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks)) { + TieringSourceEnumerator restoredEnumerator = + new TieringSourceEnumerator(flussConf, context, 500, capturedLeaseId); + restoredEnumerator.start(); + assertThat(restoredEnumerator.getKvSnapshotLeaseId()).isEqualTo(capturedLeaseId); + restoredEnumerator.close(); + } + } + + @Test + void testLeaseReleasedOnFailedTieringEvent() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-fail-test"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = 3; + + upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10); + triggerAndWaitSnapshot(tableId); + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks)) { + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); + enumerator.start(); + + // register all readers + registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + + // Lease should be acquired + assertThat(enumerator.getLeasedBucketsByTable()).containsOnlyKeys(tableId); + + // Simulate failed tiering event + enumerator.handleSourceEvent(1, new FailedTieringEvent(tableId, "test failure")); + + // Lease should be released after failure + assertThat(enumerator.getLeasedBucketsByTable()).doesNotContainKey(tableId); + } + } + + @Test + void testLeaseReleasedOnReaderFailover() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-failover-test"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numSubtasks = 3; + + upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10); + triggerAndWaitSnapshot(tableId); + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks)) { + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); + enumerator.start(); + + // register all readers with attempt 0 + registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + + // Lease should be acquired + assertThat(enumerator.getLeasedBucketsByTable()).containsOnlyKeys(tableId); + + // Simulate reader failover (attempt 1) + context.getSplitsAssignmentSequence().clear(); + registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 1); + + // After failover, all leases for the failed tables should be released + assertThat(enumerator.getLeasedBucketsByTable()).doesNotContainKey(tableId); + } + } + + @Test + void testLogOnlyTableDoesNotAcquireLease() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-log-only-test"); + createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + int numSubtasks = 3; + + appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks)) { + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); + enumerator.start(); + + // register all readers + registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + + // Log-only table should not have any leased buckets + assertThat(enumerator.getLeasedBucketsByTable()).isEmpty(); + } + } + @Test void testTableReachMaxTieringDuration() throws Throwable { TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java index 83e1dc5707..fb3f4de09f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java @@ -31,12 +31,38 @@ class TieringSourceEnumeratorStateSerializerTest { TieringSourceEnumeratorStateSerializer.INSTANCE; @Test - void testSerDeserialize() throws Exception { - TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(); + void testSerDeserializeWithLeaseId() throws Exception { + TieringSourceEnumeratorState state = + new TieringSourceEnumeratorState("tiering-test-lease-123"); + byte[] serialized = serializer.serialize(state); + assertThat(serialized.length).isGreaterThan(0); + TieringSourceEnumeratorState deserialized = + serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserialized).isEqualTo(state); + assertThat(deserialized.getKvSnapshotLeaseId()).isEqualTo("tiering-test-lease-123"); + } + + @Test + void testSerDeserializeWithNullLeaseId() throws Exception { + TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(null); byte[] serialized = serializer.serialize(state); - assertThat(serialized).hasSize(0); TieringSourceEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), serialized); assertThat(deserialized).isEqualTo(state); + assertThat(deserialized.getKvSnapshotLeaseId()).isNull(); + } + + @Test + void testV0Compatibility() throws Exception { + // v0 serialized an empty byte array; deserialization should produce a state with null + // lease id + TieringSourceEnumeratorState deserialized = serializer.deserialize(0, new byte[0]); + assertThat(deserialized.getKvSnapshotLeaseId()).isNull(); + } + + @Test + void testDefaultConstructorHasNullLeaseId() { + TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(); + assertThat(state.getKvSnapshotLeaseId()).isNull(); } } From b626575163bea43c120b90acf295d7bbbb986d7e Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 14:39:28 +0800 Subject: [PATCH 2/7] Address review comments: robust serializer clear, retain bucket tracking on lease release failure, log unavailable snapshots on acquire --- .../enumerator/TieringSourceEnumerator.java | 152 ++++++++++++++---- ...ieringSourceEnumeratorStateSerializer.java | 23 +-- 2 files changed, 134 insertions(+), 41 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 34104f670f..82916312df 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -233,6 +233,19 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname LOG.info("TieringSourceReader {} requests split.", subtaskId); readersAwaitingSplit.add(subtaskId); + // During failover we must not request a new tiering table or assign any splits. + // Otherwise we might re-acquire leases for a table whose leases were just released by + // handleSourceReaderFailOver, or assign splits to subtasks whose readers from the new + // attempt have not been fully registered yet. The pending failed-table report and the + // subsequent split request will be driven by the periodic callAsync once failover is + // marked complete. + if (isFailOvering) { + LOG.info( + "Skip handling split request from subtask {} because the enumerator is in failover.", + subtaskId); + return; + } + // If pending splits exist, assign them directly to the requesting reader if (!pendingSplits.isEmpty()) { assignSplits(); @@ -294,11 +307,23 @@ public void addReader(int subtaskId) { int globalMaxAttempt = max(maxAttempts); if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) { LOG.info( - "Failover completed. All {} subtasks reached the same attempt number {}. Current registered readers are {}", + "All {} subtasks reached the same attempt number {}. Current registered readers are {}. Waiting for failed-table report to complete before clearing failover state.", context.currentParallelism(), globalMaxAttempt, context.registeredReadersOfAttempts()); - isFailOvering = false; + // Drive a heartbeat round to report the failed tables and request a + // fresh tiering table; clear the failover flag only after that round + // completes, so that no split is assigned and no lease is re-acquired + // for a table whose leases were just released during failover. + this.context.callAsync( + this::requestTieringTableSplitsViaHeartBeat, + (tieringTable, throwable) -> { + isFailOvering = false; + LOG.info( + "Failover completed for attempt {}. Cleared failover flag.", + globalMaxAttempt); + generateAndAssignSplits(tieringTable, throwable); + }); } } } @@ -645,13 +670,33 @@ private void maybeAcquireKvSnapshotLease(long tableId, List tierin tableId, bucketsToLease.size()); try { - flussAdmin - .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) - .acquireSnapshots(bucketsToLease) - .get(); - leasedBucketsByTable - .computeIfAbsent(tableId, k -> ConcurrentHashMap.newKeySet()) - .addAll(bucketsToLease.keySet()); + Set unavailableBuckets = + flussAdmin + .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) + .acquireSnapshots(bucketsToLease) + .get() + .getUnavailableTableBucketSet(); + // Only record successfully leased buckets so we don't later try to release + // buckets that were never actually acquired (e.g. snapshots already GC'ed or + // missing on the server). + Set acquiredBuckets = new HashSet<>(bucketsToLease.keySet()); + if (!unavailableBuckets.isEmpty()) { + LOG.warn( + "Failed to acquire kv snapshot lease for {} of {} buckets of tiering " + + "table {}: {}. The corresponding snapshots may have already " + + "been garbage-collected; tiering for those buckets may fail " + + "later when the snapshots are accessed.", + unavailableBuckets.size(), + bucketsToLease.size(), + tableId, + unavailableBuckets); + acquiredBuckets.removeAll(unavailableBuckets); + } + if (!acquiredBuckets.isEmpty()) { + leasedBucketsByTable + .computeIfAbsent(tableId, k -> ConcurrentHashMap.newKeySet()) + .addAll(acquiredBuckets); + } } catch (Exception e) { if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) { LOG.warn( @@ -678,8 +723,13 @@ private void maybeAcquireKvSnapshotLease(long tableId, List tierin * for which acquire failed) are handled as no-ops. */ private void maybeReleaseKvSnapshotLease(long tableId) { - Set buckets = leasedBucketsByTable.remove(tableId); + // Peek the buckets without removing so that, if the release RPC fails, we can keep + // the entry in leasedBucketsByTable for a later retry instead of permanently + // forgetting which buckets are still leased on the server side. + Set buckets = leasedBucketsByTable.get(tableId); if (flussAdmin == null || buckets == null || buckets.isEmpty()) { + // Nothing to release; clean up any empty entry. + leasedBucketsByTable.remove(tableId); return; } LOG.info( @@ -687,20 +737,39 @@ private void maybeReleaseKvSnapshotLease(long tableId) { kvSnapshotLeaseId, tableId, buckets.size()); + // Take a defensive copy of the buckets to release so concurrent updates to the + // tracked set do not affect the in-flight RPC payload. + Set bucketsToRelease = new HashSet<>(buckets); try { flussAdmin .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) - .releaseSnapshots(buckets) + .releaseSnapshots(bucketsToRelease) .get(); + // Only drop the bookkeeping entry after the server confirms the release. + Set tracked = leasedBucketsByTable.get(tableId); + if (tracked != null) { + tracked.removeAll(bucketsToRelease); + if (tracked.isEmpty()) { + leasedBucketsByTable.remove(tableId); + } + } } catch (Exception e) { if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) { + // Server does not support the lease API; drop tracking since release is a + // no-op and there is no point retrying. + leasedBucketsByTable.remove(tableId); LOG.warn( "Failed to release kv snapshot lease for tiering table {} because the " + "server does not support kv snapshot lease API.", tableId, e); } else { - LOG.error("Failed to release kv snapshot lease for tiering table {}.", tableId, e); + // Keep the buckets tracked so we (or the next failover/close) can retry. + LOG.error( + "Failed to release kv snapshot lease for tiering table {}; the buckets " + + "remain tracked and will be retried on next release attempt.", + tableId, + e); } } } @@ -710,37 +779,56 @@ private void maybeReleaseKvSnapshotLease(long tableId) { * avoid blocking the coordinator thread when multiple tables need to be released. */ private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { - Set buckets = leasedBucketsByTable.remove(tableId); - if (flussAdmin == null || buckets == null || buckets.isEmpty()) { + // Peek the buckets without removing them yet; only drop the tracking entry once the + // server confirms the release. This way a transient failure (e.g. RPC timeout) does + // not silently leak the lease until expiry. + Set tracked = leasedBucketsByTable.get(tableId); + if (flussAdmin == null || tracked == null || tracked.isEmpty()) { + leasedBucketsByTable.remove(tableId); return; } + Set bucketsToRelease = new HashSet<>(tracked); LOG.info( "Asynchronously releasing kv snapshot lease {} for tiering table {} with {} buckets.", kvSnapshotLeaseId, tableId, - buckets.size()); + bucketsToRelease.size()); flussAdmin .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) - .releaseSnapshots(buckets) + .releaseSnapshots(bucketsToRelease) .whenComplete( (ignored, e) -> { - if (e != null) { - if (ExceptionUtils.findThrowable( - e, UnsupportedVersionException.class) - .isPresent()) { - LOG.warn( - "Failed to release kv snapshot lease for tiering " - + "table {} because the server does not " - + "support kv snapshot lease API.", - tableId, - e); - } else { - LOG.error( - "Failed to release kv snapshot lease for tiering " - + "table {} during failover.", - tableId, - e); + if (e == null) { + Set remaining = leasedBucketsByTable.get(tableId); + if (remaining != null) { + remaining.removeAll(bucketsToRelease); + if (remaining.isEmpty()) { + leasedBucketsByTable.remove(tableId); + } } + return; + } + if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class) + .isPresent()) { + // Server does not support the lease API; tracking is useless, + // drop it so we don't keep retrying. + leasedBucketsByTable.remove(tableId); + LOG.warn( + "Failed to release kv snapshot lease for tiering " + + "table {} because the server does not " + + "support kv snapshot lease API.", + tableId, + e); + } else { + // Keep the buckets tracked so a later failover/close can + // retry the release instead of leaking the lease until + // expiry. + LOG.error( + "Failed to release kv snapshot lease for tiering " + + "table {} during failover; the buckets " + + "remain tracked and will be retried later.", + tableId, + e); } }); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java index b573982daa..7e8dc34973 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java @@ -55,16 +55,21 @@ public int getVersion() { @Override public byte[] serialize(TieringSourceEnumeratorState obj) throws IOException { final DataOutputSerializer out = SERIALIZER_CACHE.get(); - String leaseId = obj.getKvSnapshotLeaseId(); - if (leaseId != null) { - out.writeBoolean(true); - out.writeUTF(leaseId); - } else { - out.writeBoolean(false); + try { + String leaseId = obj.getKvSnapshotLeaseId(); + if (leaseId != null) { + out.writeBoolean(true); + out.writeUTF(leaseId); + } else { + out.writeBoolean(false); + } + return out.getCopyOfBuffer(); + } finally { + // Always clear the cached ThreadLocal serializer, even if a write throws, so that + // a partially-written buffer does not corrupt subsequent serializations on this + // thread. + out.clear(); } - final byte[] result = out.getCopyOfBuffer(); - out.clear(); - return result; } @Override From 1be6a585ce32df8931d1383e579dbca6c451f96f Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 15:58:51 +0800 Subject: [PATCH 3/7] [Tiering] Fix CI timeout issue by optimizing test wait logic - Reduce retry timeout from 30s to 5s in testTableReachMaxTieringDuration - Add 10s timeout protection to waitUntilTieringTableSplitAssignmentReady method - Reduce sleep time from 3000ms to 1000ms for faster test execution - Fix checkstyle trailing whitespace issues These changes prevent 60-minute CI timeout by reducing excessive waiting in tests. --- .../TieringSourceEnumeratorTest.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index 3c8b7b4546..ceebf1c5e7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -166,7 +166,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { // register all readers registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); - waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 1000L); List expectedSnapshotAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { @@ -457,7 +457,7 @@ void testPartitionedLogTableSplits() throws Throwable { 0, 1); - waitUntilTieringTableSplitAssignmentReady(context, partitionNameByIds.size(), 3000L); + waitUntilTieringTableSplitAssignmentReady(context, partitionNameByIds.size(), 1000L); List expectedAssignment = new ArrayList<>(); for (Map.Entry partitionNameById : partitionNameByIds.entrySet()) { @@ -705,13 +705,25 @@ private void waitUntilTieringTableSplitAssignmentReady( int expectedSplitsNum, long sleepMs) throws Throwable { + long startTime = System.currentTimeMillis(); + long timeoutMs = 10000; // 10秒超时 + while (context.getSplitsAssignmentSequence().size() < expectedSplitsNum) { + if (System.currentTimeMillis() - startTime > timeoutMs) { + throw new AssertionError( + String.format( + "等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms", + expectedSplitsNum, + context.getSplitsAssignmentSequence().size(), + timeoutMs)); + } + if (!context.getPeriodicCallables().isEmpty()) { context.runPeriodicCallable(0); } else { context.runNextOneTimeCallable(); } - Thread.sleep(sleepMs); + Thread.sleep(Math.min(sleepMs, 100)); // 最大等待100ms } } @@ -785,7 +797,7 @@ void testLeaseReleasedOnFailedTieringEvent() throws Throwable { // register all readers registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); - waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 1000L); // Lease should be acquired assertThat(enumerator.getLeasedBucketsByTable()).containsOnlyKeys(tableId); @@ -814,7 +826,7 @@ void testLeaseReleasedOnReaderFailover() throws Throwable { // register all readers with attempt 0 registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); - waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 1000L); // Lease should be acquired assertThat(enumerator.getLeasedBucketsByTable()).containsOnlyKeys(tableId); @@ -843,7 +855,7 @@ void testLogOnlyTableDoesNotAcquireLease() throws Throwable { // register all readers registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); - waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); + waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 1000L); // Log-only table should not have any leased buckets assertThat(enumerator.getLeasedBucketsByTable()).isEmpty(); @@ -877,7 +889,7 @@ void testTableReachMaxTieringDuration() throws Throwable { waitUntilTieringTableSplitAssignmentReady(context, 2, 200L); retry( - Duration.ofSeconds(30), + Duration.ofSeconds(5), () -> { // Verify that TieringReachMaxDurationEvent was sent to all readers // Use reflection to access events sent to readers From 885e5723cf65883b51d0fa2c67a9db6500b38109 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 16:44:24 +0800 Subject: [PATCH 4/7] [Tiering] Dispatch async lease release via context.callAsync to avoid blocking coordinator Use SplitEnumeratorContext.callAsync to run the blocking release RPC on the coordinator worker pool and apply bookkeeping updates back on the coordinator thread, so failover does not stall the enumerator while still keeping leasedBucketsByTable mutations single-threaded. Update testLeaseReleasedOnReaderFailover to drive the next one-time callable explicitly so the release RPC is observed before the assertion, without draining subsequent heartbeat callables that would re-acquire a lease for the same table. --- .../enumerator/TieringSourceEnumerator.java | 100 +++++++++++------- .../TieringSourceEnumeratorTest.java | 22 +++- 2 files changed, 81 insertions(+), 41 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 82916312df..1c0bfc0231 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -777,13 +777,27 @@ private void maybeReleaseKvSnapshotLease(long tableId) { /** * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used during failover to * avoid blocking the coordinator thread when multiple tables need to be released. + * + *

Implementation note: the actual blocking RPC is dispatched via {@link + * SplitEnumeratorContext#callAsync}, which executes the work on the source coordinator's worker + * pool and routes the completion handler back to the coordinator thread. This gives us: + * + *

    + *
  • The same simple synchronous semantics as {@link #maybeReleaseKvSnapshotLease(long)} for + * state updates (the bookkeeping map is mutated only from the coordinator thread). + *
  • No blocking of the coordinator thread itself, even when many tables are released at + * once during a failover. + *
  • Deterministic behavior under tests, since the mock enumerator context lets the test + * drive the queued one-time callable explicitly. + *
*/ private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { - // Peek the buckets without removing them yet; only drop the tracking entry once the - // server confirms the release. This way a transient failure (e.g. RPC timeout) does - // not silently leak the lease until expiry. + // Peek the buckets without removing so that, if the release RPC fails, we can keep + // the entry in leasedBucketsByTable for a later retry instead of permanently + // forgetting which buckets are still leased on the server side. Set tracked = leasedBucketsByTable.get(tableId); if (flussAdmin == null || tracked == null || tracked.isEmpty()) { + // Nothing to release; clean up any empty entry. leasedBucketsByTable.remove(tableId); return; } @@ -793,44 +807,52 @@ private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { kvSnapshotLeaseId, tableId, bucketsToRelease.size()); - flussAdmin - .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) - .releaseSnapshots(bucketsToRelease) - .whenComplete( - (ignored, e) -> { - if (e == null) { - Set remaining = leasedBucketsByTable.get(tableId); - if (remaining != null) { - remaining.removeAll(bucketsToRelease); - if (remaining.isEmpty()) { - leasedBucketsByTable.remove(tableId); - } - } - return; - } - if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class) - .isPresent()) { - // Server does not support the lease API; tracking is useless, - // drop it so we don't keep retrying. + context.callAsync( + () -> { + // Executed on the coordinator worker pool: it is OK to block here + // waiting for the release RPC to complete. + flussAdmin + .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) + .releaseSnapshots(bucketsToRelease) + .get(); + return null; + }, + (ignored, e) -> { + // Executed back on the coordinator thread: safe to mutate the + // bookkeeping map without extra synchronization. + if (e == null) { + Set remaining = leasedBucketsByTable.get(tableId); + if (remaining != null) { + remaining.removeAll(bucketsToRelease); + if (remaining.isEmpty()) { leasedBucketsByTable.remove(tableId); - LOG.warn( - "Failed to release kv snapshot lease for tiering " - + "table {} because the server does not " - + "support kv snapshot lease API.", - tableId, - e); - } else { - // Keep the buckets tracked so a later failover/close can - // retry the release instead of leaking the lease until - // expiry. - LOG.error( - "Failed to release kv snapshot lease for tiering " - + "table {} during failover; the buckets " - + "remain tracked and will be retried later.", - tableId, - e); } - }); + } + return; + } + if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class) + .isPresent()) { + // Server does not support the lease API; tracking is useless, + // drop it so we don't keep retrying. + leasedBucketsByTable.remove(tableId); + LOG.warn( + "Failed to release kv snapshot lease for tiering " + + "table {} because the server does not " + + "support kv snapshot lease API.", + tableId, + e); + } else { + // Keep the buckets tracked so a later failover/close can + // retry the release instead of leaking the lease until + // expiry. + LOG.error( + "Failed to release kv snapshot lease for tiering " + + "table {} during failover; the buckets " + + "remain tracked and will be retried later.", + tableId, + e); + } + }); } @VisibleForTesting diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index ceebf1c5e7..4a78add120 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -718,12 +718,21 @@ private void waitUntilTieringTableSplitAssignmentReady( timeoutMs)); } + // 优先运行periodic callable,这是主要的分配驱动机制 if (!context.getPeriodicCallables().isEmpty()) { context.runPeriodicCallable(0); - } else { + } + + // 然后运行one-time callable + if (!context.getOneTimeCallables().isEmpty()) { context.runNextOneTimeCallable(); } - Thread.sleep(Math.min(sleepMs, 100)); // 最大等待100ms + + // 如果没有可运行的任务,等待一小段时间 + if (context.getPeriodicCallables().isEmpty() + && context.getOneTimeCallables().isEmpty()) { + Thread.sleep(Math.min(sleepMs, 100)); // 最大等待100ms + } } } @@ -835,6 +844,15 @@ void testLeaseReleasedOnReaderFailover() throws Throwable { context.getSplitsAssignmentSequence().clear(); registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 1); + // The lease release is dispatched via context.callAsync to avoid blocking the + // coordinator thread. Drive only the very next one-time callable, which is the + // release task enqueued by handleSourceReaderFailOver. We intentionally do NOT + // drain all queued callables here, because subsequent callables may issue a + // heartbeat that re-acquires a lease for the same table and would mask the + // release we are trying to verify. + assertThat(context.getOneTimeCallables()).isNotEmpty(); + context.runNextOneTimeCallable(); + // After failover, all leases for the failed tables should be released assertThat(enumerator.getLeasedBucketsByTable()).doesNotContainKey(tableId); } From 7743e3c613378be89ecb79017501740a7aee7f81 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 17:19:19 +0800 Subject: [PATCH 5/7] [flink] address review comments for kv snapshot lease in tiering --- .../flink/tiering/source/TieringSource.java | 7 +- .../enumerator/TieringSourceEnumerator.java | 206 +++--------------- .../state/TieringSourceEnumeratorState.java | 53 +---- ...ieringSourceEnumeratorStateSerializer.java | 64 +----- .../TieringSourceEnumeratorTest.java | 64 +----- ...ngSourceEnumeratorStateSerializerTest.java | 33 +-- 6 files changed, 44 insertions(+), 383 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index 0a3b1d0964..b8a46ccd9a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -91,13 +91,8 @@ public SplitEnumerator createEnumera public SplitEnumerator restoreEnumerator( SplitEnumeratorContext splitEnumeratorContext, TieringSourceEnumeratorState tieringSourceEnumeratorState) { - // Restore with the lease id from the checkpoint so that the enumerator reuses - // the same lease instead of leaking orphaned leases. return new TieringSourceEnumerator( - flussConf, - splitEnumeratorContext, - pollTieringTableIntervalMs, - tieringSourceEnumeratorState.getKvSnapshotLeaseId()); + flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 1c0bfc0231..07de1a0854 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -106,14 +106,10 @@ public class TieringSourceEnumerator * KV snapshot lease duration for the whole tiering job. One lease covers the entire job * lifecycle; it is renewed implicitly by every {@code acquireSnapshots} call, so a relatively * long duration is safe and also bounds the worst-case leaked-lease lifetime if the job dies - * abnormally. - * - *

TODO: introduce an explicit periodic lease-renewal mechanism so that a single tiering - * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for very large tables) will - * not see its snapshots garbage-collected mid-flight. Tracked as a follow-up issue; tiering - * rounds are typically minute-level today so a 1-day lease is sufficient in practice. + * abnormally. Tiering rounds are typically minute-level today so a 6-hour lease is more than + * sufficient for most cases. */ - private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofDays(1).toMillis(); + private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofHours(6).toMillis(); private final Configuration flussConf; private final SplitEnumeratorContext context; @@ -156,19 +152,6 @@ public TieringSourceEnumerator( Configuration flussConf, SplitEnumeratorContext context, long pollTieringTableIntervalMs) { - this(flussConf, context, pollTieringTableIntervalMs, null); - } - - /** - * Creates a new enumerator, optionally restoring from a previously persisted lease id. - * - * @param restoredLeaseId the lease id from a previous checkpoint, or null for fresh start - */ - public TieringSourceEnumerator( - Configuration flussConf, - SplitEnumeratorContext context, - long pollTieringTableIntervalMs, - @Nullable String restoredLeaseId) { this.flussConf = flussConf; this.context = context; this.timerService = @@ -185,8 +168,7 @@ public TieringSourceEnumerator( // Thread safety: outer map is ConcurrentHashMap, values are ConcurrentHashMap-backed // Sets. Reads/writes are safe across the coordinator thread and the timer thread. this.leasedBucketsByTable = new ConcurrentHashMap<>(); - this.kvSnapshotLeaseId = - restoredLeaseId != null ? restoredLeaseId : "tiering-" + UUID.randomUUID(); + this.kvSnapshotLeaseId = "tiering-" + UUID.randomUUID(); } @Override @@ -233,19 +215,6 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname LOG.info("TieringSourceReader {} requests split.", subtaskId); readersAwaitingSplit.add(subtaskId); - // During failover we must not request a new tiering table or assign any splits. - // Otherwise we might re-acquire leases for a table whose leases were just released by - // handleSourceReaderFailOver, or assign splits to subtasks whose readers from the new - // attempt have not been fully registered yet. The pending failed-table report and the - // subsequent split request will be driven by the periodic callAsync once failover is - // marked complete. - if (isFailOvering) { - LOG.info( - "Skip handling split request from subtask {} because the enumerator is in failover.", - subtaskId); - return; - } - // If pending splits exist, assign them directly to the requesting reader if (!pendingSplits.isEmpty()) { assignSplits(); @@ -307,23 +276,11 @@ public void addReader(int subtaskId) { int globalMaxAttempt = max(maxAttempts); if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) { LOG.info( - "All {} subtasks reached the same attempt number {}. Current registered readers are {}. Waiting for failed-table report to complete before clearing failover state.", + "Failover completed. All {} subtasks reached the same attempt number {}. Current registered readers are {}", context.currentParallelism(), globalMaxAttempt, context.registeredReadersOfAttempts()); - // Drive a heartbeat round to report the failed tables and request a - // fresh tiering table; clear the failover flag only after that round - // completes, so that no split is assigned and no lease is re-acquired - // for a table whose leases were just released during failover. - this.context.callAsync( - this::requestTieringTableSplitsViaHeartBeat, - (tieringTable, throwable) -> { - isFailOvering = false; - LOG.info( - "Failover completed for attempt {}. Cleared failover flag.", - globalMaxAttempt); - generateAndAssignSplits(tieringTable, throwable); - }); + isFailOvering = false; } } } @@ -391,17 +348,14 @@ private void handleSourceReaderFailOver() { tieringTableEpochs); // we need to make all as failed failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs)); - // release all currently leased buckets for tables that are being marked failed; - // take a snapshot of the keys first to avoid concurrent modification. - Set tableIdsToRelease = new HashSet<>(tieringTableEpochs.keySet()); tieringTableEpochs.clear(); tieringReachMaxDurationsTables.clear(); // also clean all pending splits since we mark all as failed pendingSplits.clear(); - // Release leases asynchronously to avoid blocking the coordinator thread when - // multiple tables are involved and RPC calls may time out. - for (Long tableId : tableIdsToRelease) { - maybeReleaseKvSnapshotLeaseAsync(tableId); + // Release leases for all currently tracked tables to avoid leaking server-side + // snapshot references when those tables are marked failed during failover. + for (Long tableId : new HashSet<>(leasedBucketsByTable.keySet())) { + maybeReleaseKvSnapshotLease(tableId); } if (!failedTableEpochs.isEmpty()) { // call one round of heartbeat to notify table has been finished or failed @@ -589,9 +543,7 @@ private List populateNumberOfTieringSplits(List tier @Override public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exception { - // Persist the lease id so that on restore we can reuse the same id instead of leaking - // an orphaned lease on the server. - return new TieringSourceEnumeratorState(kvSnapshotLeaseId); + return new TieringSourceEnumeratorState(); } @Override @@ -611,18 +563,16 @@ public void close() throws IOException { LOG.error("Failed to close Tiering Source enumerator.", e); } } - // NOTE: we intentionally do NOT drop the kv snapshot lease here. The lease id is - // persisted into the enumerator checkpoint state and will be reused by the restored - // enumerator after a JM failover. Dropping it on close would destroy the lease that - // the restored enumerator expects to reuse, potentially causing the referenced - // snapshots to be garbage-collected before tiering finishes. The lease will expire - // naturally on the server side (see KV_SNAPSHOT_LEASE_DURATION_MS). - // - // TODO: if the job is cancelled by the user (rather than restarted), the lease will only - // be reclaimed on the server side when it expires (up to KV_SNAPSHOT_LEASE_DURATION_MS). - // We cannot currently distinguish "user cancel" from "failover" at the SplitEnumerator - // layer; consider wiring a cancel hook (or using Flink's close(reason) when available) - // so that user-initiated cancellations can drop the lease eagerly. + // Release any remaining leases held by this enumerator. The fluss cluster will also + // expire stale leases naturally after KV_SNAPSHOT_LEASE_DURATION_MS, so a best-effort + // release here is sufficient. + for (Long tableId : new HashSet<>(leasedBucketsByTable.keySet())) { + try { + maybeReleaseKvSnapshotLease(tableId); + } catch (Exception e) { + LOG.warn("Failed to release kv snapshot lease for table {} on close.", tableId, e); + } + } try { if (flussAdmin != null) { LOG.info("Closing Fluss Admin client..."); @@ -651,9 +601,6 @@ public void close() throws IOException { * snapshot lease API, to preserve compatibility with older Fluss clusters. */ private void maybeAcquireKvSnapshotLease(long tableId, List tieringSplits) { - if (flussAdmin == null) { - return; - } Map bucketsToLease = new HashMap<>(); for (TieringSplit split : tieringSplits) { if (split.isTieringSnapshotSplit()) { @@ -723,138 +670,37 @@ private void maybeAcquireKvSnapshotLease(long tableId, List tierin * for which acquire failed) are handled as no-ops. */ private void maybeReleaseKvSnapshotLease(long tableId) { - // Peek the buckets without removing so that, if the release RPC fails, we can keep - // the entry in leasedBucketsByTable for a later retry instead of permanently - // forgetting which buckets are still leased on the server side. - Set buckets = leasedBucketsByTable.get(tableId); - if (flussAdmin == null || buckets == null || buckets.isEmpty()) { - // Nothing to release; clean up any empty entry. - leasedBucketsByTable.remove(tableId); + Set bucketsToRelease = leasedBucketsByTable.remove(tableId); + if (bucketsToRelease == null || bucketsToRelease.isEmpty()) { return; } LOG.info( "Try to release kv snapshot lease {} for tiering table {} with {} buckets.", kvSnapshotLeaseId, tableId, - buckets.size()); - // Take a defensive copy of the buckets to release so concurrent updates to the - // tracked set do not affect the in-flight RPC payload. - Set bucketsToRelease = new HashSet<>(buckets); + bucketsToRelease.size()); try { flussAdmin .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) .releaseSnapshots(bucketsToRelease) .get(); - // Only drop the bookkeeping entry after the server confirms the release. - Set tracked = leasedBucketsByTable.get(tableId); - if (tracked != null) { - tracked.removeAll(bucketsToRelease); - if (tracked.isEmpty()) { - leasedBucketsByTable.remove(tableId); - } - } } catch (Exception e) { if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) { - // Server does not support the lease API; drop tracking since release is a - // no-op and there is no point retrying. - leasedBucketsByTable.remove(tableId); LOG.warn( "Failed to release kv snapshot lease for tiering table {} because the " + "server does not support kv snapshot lease API.", tableId, e); } else { - // Keep the buckets tracked so we (or the next failover/close) can retry. LOG.error( - "Failed to release kv snapshot lease for tiering table {}; the buckets " - + "remain tracked and will be retried on next release attempt.", + "Failed to release kv snapshot lease for tiering table {}. The lease " + + "will expire naturally on the server side.", tableId, e); } } } - /** - * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used during failover to - * avoid blocking the coordinator thread when multiple tables need to be released. - * - *

Implementation note: the actual blocking RPC is dispatched via {@link - * SplitEnumeratorContext#callAsync}, which executes the work on the source coordinator's worker - * pool and routes the completion handler back to the coordinator thread. This gives us: - * - *

    - *
  • The same simple synchronous semantics as {@link #maybeReleaseKvSnapshotLease(long)} for - * state updates (the bookkeeping map is mutated only from the coordinator thread). - *
  • No blocking of the coordinator thread itself, even when many tables are released at - * once during a failover. - *
  • Deterministic behavior under tests, since the mock enumerator context lets the test - * drive the queued one-time callable explicitly. - *
- */ - private void maybeReleaseKvSnapshotLeaseAsync(long tableId) { - // Peek the buckets without removing so that, if the release RPC fails, we can keep - // the entry in leasedBucketsByTable for a later retry instead of permanently - // forgetting which buckets are still leased on the server side. - Set tracked = leasedBucketsByTable.get(tableId); - if (flussAdmin == null || tracked == null || tracked.isEmpty()) { - // Nothing to release; clean up any empty entry. - leasedBucketsByTable.remove(tableId); - return; - } - Set bucketsToRelease = new HashSet<>(tracked); - LOG.info( - "Asynchronously releasing kv snapshot lease {} for tiering table {} with {} buckets.", - kvSnapshotLeaseId, - tableId, - bucketsToRelease.size()); - context.callAsync( - () -> { - // Executed on the coordinator worker pool: it is OK to block here - // waiting for the release RPC to complete. - flussAdmin - .createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS) - .releaseSnapshots(bucketsToRelease) - .get(); - return null; - }, - (ignored, e) -> { - // Executed back on the coordinator thread: safe to mutate the - // bookkeeping map without extra synchronization. - if (e == null) { - Set remaining = leasedBucketsByTable.get(tableId); - if (remaining != null) { - remaining.removeAll(bucketsToRelease); - if (remaining.isEmpty()) { - leasedBucketsByTable.remove(tableId); - } - } - return; - } - if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class) - .isPresent()) { - // Server does not support the lease API; tracking is useless, - // drop it so we don't keep retrying. - leasedBucketsByTable.remove(tableId); - LOG.warn( - "Failed to release kv snapshot lease for tiering " - + "table {} because the server does not " - + "support kv snapshot lease API.", - tableId, - e); - } else { - // Keep the buckets tracked so a later failover/close can - // retry the release instead of leaking the lease until - // expiry. - LOG.error( - "Failed to release kv snapshot lease for tiering " - + "table {} during failover; the buckets " - + "remain tracked and will be retried later.", - tableId, - e); - } - }); - } - @VisibleForTesting String getKvSnapshotLeaseId() { return kvSnapshotLeaseId; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java index 0f40d9bfbc..f0fe05835a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java @@ -17,57 +17,8 @@ package org.apache.fluss.flink.tiering.source.state; -import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator; - -import javax.annotation.Nullable; - -import java.util.Objects; - -/** - * The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot lease id so that it can - * be recovered after a checkpoint restore, avoiding orphaned leases on the server side. - */ +/** The state of the tiering source enumerator. Currently a stateless marker. */ public class TieringSourceEnumeratorState { - /** - * The KV snapshot lease id used by this tiering job. May be null for jobs that were - * checkpointed before lease support was introduced. - */ - @Nullable private final String kvSnapshotLeaseId; - - /** Creates a state with no lease id (backward compatible with older checkpoints). */ - public TieringSourceEnumeratorState() { - this(null); - } - - public TieringSourceEnumeratorState(@Nullable String kvSnapshotLeaseId) { - this.kvSnapshotLeaseId = kvSnapshotLeaseId; - } - - @Nullable - public String getKvSnapshotLeaseId() { - return kvSnapshotLeaseId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TieringSourceEnumeratorState that = (TieringSourceEnumeratorState) o; - return Objects.equals(kvSnapshotLeaseId, that.kvSnapshotLeaseId); - } - - @Override - public int hashCode() { - return Objects.hash(kvSnapshotLeaseId); - } - - @Override - public String toString() { - return "TieringSourceEnumeratorState{" + "kvSnapshotLeaseId='" + kvSnapshotLeaseId + "'}"; - } + public TieringSourceEnumeratorState() {} } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java index 7e8dc34973..b156ba1694 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java @@ -18,34 +18,17 @@ package org.apache.fluss.flink.tiering.source.state; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; -/** - * Serializer for {@link TieringSourceEnumeratorState}. - * - *

Version Evolution:

- * - *
    - *
  • Version 0: Empty state (stateless marker). - *
  • Version 1 (Current): Stores the KV snapshot lease id so that it can be recovered - * after a checkpoint restore. - *
- */ +/** Serializer for {@link TieringSourceEnumeratorState}. The state is a stateless marker. */ public class TieringSourceEnumeratorStateSerializer implements SimpleVersionedSerializer { public static final TieringSourceEnumeratorStateSerializer INSTANCE = new TieringSourceEnumeratorStateSerializer(); - private static final int VERSION_0 = 0; - private static final int VERSION_1 = 1; - private static final int CURRENT_VERSION = VERSION_1; - - private static final ThreadLocal SERIALIZER_CACHE = - ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); + private static final int CURRENT_VERSION = 0; @Override public int getVersion() { @@ -54,46 +37,19 @@ public int getVersion() { @Override public byte[] serialize(TieringSourceEnumeratorState obj) throws IOException { - final DataOutputSerializer out = SERIALIZER_CACHE.get(); - try { - String leaseId = obj.getKvSnapshotLeaseId(); - if (leaseId != null) { - out.writeBoolean(true); - out.writeUTF(leaseId); - } else { - out.writeBoolean(false); - } - return out.getCopyOfBuffer(); - } finally { - // Always clear the cached ThreadLocal serializer, even if a write throws, so that - // a partially-written buffer does not corrupt subsequent serializations on this - // thread. - out.clear(); - } + return new byte[0]; } @Override public TieringSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException { - switch (version) { - case VERSION_0: - // v0 was an empty state; no lease id persisted - return new TieringSourceEnumeratorState(); - case VERSION_1: - return deserializeV1(serialized); - default: - throw new IOException( - String.format( - "The bytes are serialized with version %d, " - + "while this deserializer only supports version up to %d", - version, CURRENT_VERSION)); + if (version != CURRENT_VERSION) { + throw new IOException( + String.format( + "The bytes are serialized with version %d, " + + "while this deserializer only supports version %d", + version, CURRENT_VERSION)); } - } - - private TieringSourceEnumeratorState deserializeV1(byte[] serialized) throws IOException { - DataInputDeserializer in = new DataInputDeserializer(serialized); - boolean hasLeaseId = in.readBoolean(); - String leaseId = hasLeaseId ? in.readUTF() : null; - return new TieringSourceEnumeratorState(leaseId); + return new TieringSourceEnumeratorState(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index 4a78add120..8d3aad8260 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -705,34 +705,13 @@ private void waitUntilTieringTableSplitAssignmentReady( int expectedSplitsNum, long sleepMs) throws Throwable { - long startTime = System.currentTimeMillis(); - long timeoutMs = 10000; // 10秒超时 - while (context.getSplitsAssignmentSequence().size() < expectedSplitsNum) { - if (System.currentTimeMillis() - startTime > timeoutMs) { - throw new AssertionError( - String.format( - "等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms", - expectedSplitsNum, - context.getSplitsAssignmentSequence().size(), - timeoutMs)); - } - - // 优先运行periodic callable,这是主要的分配驱动机制 if (!context.getPeriodicCallables().isEmpty()) { context.runPeriodicCallable(0); - } - - // 然后运行one-time callable - if (!context.getOneTimeCallables().isEmpty()) { + } else { context.runNextOneTimeCallable(); } - - // 如果没有可运行的任务,等待一小段时间 - if (context.getPeriodicCallables().isEmpty() - && context.getOneTimeCallables().isEmpty()) { - Thread.sleep(Math.min(sleepMs, 100)); // 最大等待100ms - } + Thread.sleep(sleepMs); } } @@ -760,36 +739,6 @@ private TieringSourceEnumerator createTieringSourceEnumerator( return new TieringSourceEnumerator(flussConf, context, 500); } - @Test - void testLeaseIdRestoredFromState() throws Throwable { - TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-restore-test"); - long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); - int numSubtasks = 3; - - upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10); - triggerAndWaitSnapshot(tableId); - - String capturedLeaseId; - // First enumerator: create and capture the lease id - try (FlussMockSplitEnumeratorContext context = - new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); - enumerator.start(); - capturedLeaseId = enumerator.getKvSnapshotLeaseId(); - assertThat(capturedLeaseId).startsWith("tiering-"); - } - - // Second enumerator: restore from state with same lease id - try (FlussMockSplitEnumeratorContext context = - new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator restoredEnumerator = - new TieringSourceEnumerator(flussConf, context, 500, capturedLeaseId); - restoredEnumerator.start(); - assertThat(restoredEnumerator.getKvSnapshotLeaseId()).isEqualTo(capturedLeaseId); - restoredEnumerator.close(); - } - } - @Test void testLeaseReleasedOnFailedTieringEvent() throws Throwable { TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-lease-fail-test"); @@ -844,15 +793,6 @@ void testLeaseReleasedOnReaderFailover() throws Throwable { context.getSplitsAssignmentSequence().clear(); registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 1); - // The lease release is dispatched via context.callAsync to avoid blocking the - // coordinator thread. Drive only the very next one-time callable, which is the - // release task enqueued by handleSourceReaderFailOver. We intentionally do NOT - // drain all queued callables here, because subsequent callables may issue a - // heartbeat that re-acquires a lease for the same table and would mask the - // release we are trying to verify. - assertThat(context.getOneTimeCallables()).isNotEmpty(); - context.runNextOneTimeCallable(); - // After failover, all leases for the failed tables should be released assertThat(enumerator.getLeasedBucketsByTable()).doesNotContainKey(tableId); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java index fb3f4de09f..f990d7e39f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializerTest.java @@ -31,38 +31,11 @@ class TieringSourceEnumeratorStateSerializerTest { TieringSourceEnumeratorStateSerializer.INSTANCE; @Test - void testSerDeserializeWithLeaseId() throws Exception { - TieringSourceEnumeratorState state = - new TieringSourceEnumeratorState("tiering-test-lease-123"); - byte[] serialized = serializer.serialize(state); - assertThat(serialized.length).isGreaterThan(0); - TieringSourceEnumeratorState deserialized = - serializer.deserialize(serializer.getVersion(), serialized); - assertThat(deserialized).isEqualTo(state); - assertThat(deserialized.getKvSnapshotLeaseId()).isEqualTo("tiering-test-lease-123"); - } - - @Test - void testSerDeserializeWithNullLeaseId() throws Exception { - TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(null); + void testSerDeserialize() throws Exception { + TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(); byte[] serialized = serializer.serialize(state); TieringSourceEnumeratorState deserialized = serializer.deserialize(serializer.getVersion(), serialized); - assertThat(deserialized).isEqualTo(state); - assertThat(deserialized.getKvSnapshotLeaseId()).isNull(); - } - - @Test - void testV0Compatibility() throws Exception { - // v0 serialized an empty byte array; deserialization should produce a state with null - // lease id - TieringSourceEnumeratorState deserialized = serializer.deserialize(0, new byte[0]); - assertThat(deserialized.getKvSnapshotLeaseId()).isNull(); - } - - @Test - void testDefaultConstructorHasNullLeaseId() { - TieringSourceEnumeratorState state = new TieringSourceEnumeratorState(); - assertThat(state.getKvSnapshotLeaseId()).isNull(); + assertThat(deserialized).isNotNull(); } } From 16ab4ea387042c5b34d7eb741b0ba01c5e1f9290 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 9 May 2026 17:46:46 +0800 Subject: [PATCH 6/7] [flink] fix lease re-acquisition during reader failover in TieringSourceEnumerator --- .../enumerator/TieringSourceEnumerator.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 07de1a0854..7b96e76665 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -215,6 +215,14 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname LOG.info("TieringSourceReader {} requests split.", subtaskId); readersAwaitingSplit.add(subtaskId); + // Skip the synchronous split request path during failover. Otherwise we may immediately + // re-acquire a kv snapshot lease for the same table that was just released by the + // failover handling, leaving stale lease state until the next periodic poll. The + // periodic callAsync task will resume normal split generation once failover completes. + if (isFailOvering) { + return; + } + // If pending splits exist, assign them directly to the requesting reader if (!pendingSplits.isEmpty()) { assignSplits(); @@ -280,7 +288,13 @@ public void addReader(int subtaskId) { context.currentParallelism(), globalMaxAttempt, context.registeredReadersOfAttempts()); - isFailOvering = false; + // Defer clearing isFailOvering until the next periodic poll runs in + // the coordinator thread. This guarantees that any synchronous + // handleSplitRequest invoked right after addReader still observes + // isFailOvering=true and skips the synchronous fetch path, so we + // don't immediately re-acquire a kv snapshot lease that was just + // released by handleSourceReaderFailOver(). + context.runInCoordinatorThread(() -> isFailOvering = false); } } } From 714a6401754216cbb1376aa19780aaf9081a9ffb Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sun, 10 May 2026 09:02:38 +0800 Subject: [PATCH 7/7] [flink] simplify TieringSourceEnumeratorStateSerializer deserialize logic --- .../TieringSourceEnumeratorStateSerializer.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java index b156ba1694..cff0fea4f8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java @@ -21,14 +21,15 @@ import java.io.IOException; -/** Serializer for {@link TieringSourceEnumeratorState}. The state is a stateless marker. */ +/** Serializer for {@link TieringSourceEnumeratorState}. */ public class TieringSourceEnumeratorStateSerializer implements SimpleVersionedSerializer { public static final TieringSourceEnumeratorStateSerializer INSTANCE = new TieringSourceEnumeratorStateSerializer(); - private static final int CURRENT_VERSION = 0; + private static final int VERSION = 0; + private static final int CURRENT_VERSION = VERSION; @Override public int getVersion() { @@ -37,19 +38,14 @@ public int getVersion() { @Override public byte[] serialize(TieringSourceEnumeratorState obj) throws IOException { + // no need to store anything return new byte[0]; } @Override public TieringSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { - throw new IOException( - String.format( - "The bytes are serialized with version %d, " - + "while this deserializer only supports version %d", - version, CURRENT_VERSION)); - } + // new state return new TieringSourceEnumeratorState(); } }