-
Notifications
You must be signed in to change notification settings - Fork 470
[kv] Support kv snapshot lease #2179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
05bc3d6 to
6611274
Compare
a2b00c8 to
7d9f6c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for KV snapshot consumers to prevent premature deletion of snapshots that are still being consumed. The implementation introduces a consumer registration mechanism where clients can register their interest in specific snapshots, preventing them from being deleted until consumption is complete or the consumer expires.
Key Changes:
- Introduced
KvSnapshotConsumerdata structure to track snapshot consumption per consumer - Implemented
KvSnapshotConsumerManagerin the coordinator to manage consumer lifecycle and expiration - Modified snapshot retention logic to be consumer-aware rather than based on a fixed retention count
- Added Flink source integration to automatically register/unregister consumers during checkpoint lifecycle
Reviewed changes
Copilot reviewed 65 out of 65 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
KvSnapshotConsumer.java |
New data structure tracking consumed snapshots per table/partition/bucket |
KvSnapshotConsumerManager.java |
Manager for consumer registration, unregistration, and expiration checking |
CompletedSnapshotStore.java |
Modified to use consumer-aware retention instead of fixed count |
FlinkSourceEnumerator.java |
Registers consumers when initializing splits, unregisters on checkpoint complete |
FlinkSourceReader.java |
Reports finished snapshot consumption via source events |
ConfigOptions.java |
Added configuration for consumer expiration checking and default expiration time |
ZooKeeperClient.java |
Added methods for persisting consumer state in ZooKeeper |
| Various test files | Removed fixed snapshot retention configuration, added consumer-based tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/metadata/KvSnapshotLeaseForBucket.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotLease.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/apache/fluss/flink/source/event/FinishedKvSnapshotConsumeEvent.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java
Outdated
Show resolved
Hide resolved
799ab05 to
2a9a846
Compare
b53fa8a to
c600bf0
Compare
d627171 to
31f4087
Compare
31f4087 to
0a66881
Compare
platinumhamburg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swuferhong Thank you for undertaking this complex work, I've left some review comments.
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
Outdated
Show resolved
Hide resolved
| public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize"; | ||
|
|
||
| // for kv snapshot lease. | ||
| public static final String KV_SNAPSHOT_LEASE_COUNT = "kvSnapshotLeaseCount"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be implemented at the table level so we can quickly identify the affected business when lease leaks occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I was thinking the same thing. I'll add a TODO for now, and this will be implemented in the next PR since this PR is already too large. Trace by: #2297
| import org.apache.flink.table.procedure.ProcedureContext; | ||
|
|
||
| /** Procedure to drop kv snapshot lease. */ | ||
| public class DropKvSnapshotLeaseProcedure extends ProcedureBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto—“drop” isn’t the most appropriate verb to use with “lease.”
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you intend to rename this API to release, or just update the comment? According to the FIP, this API is indeed named dropKvSnapshotLease, and its behavior genuinely performs a "drop" operation.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java
Outdated
Show resolved
Hide resolved
| private static final int VERSION = 1; | ||
|
|
||
| @Override | ||
| public void serialize(KvSnapshotLeaseMetadata kvSnapshotLeaseMetadata, JsonGenerator generator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed we're using Jackson's very low-level APIs, which makes the serialization/deserialization logic quite complex. What's the reason behind this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is indeed quite old—it was originally based on Flink's serialization. We could consider refactoring it, but perhaps not in this PR?
...erver/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotTableLeaseJsonSerde.java
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManagerTest.java
Outdated
Show resolved
Hide resolved
0a66881 to
da63109
Compare
Purpose
Linked issue: close #2171
Support kv snapshot lease
Brief change log
Tests
API and Format
Documentation