Skip to content

Conversation

@swuferhong
Copy link
Contributor

@swuferhong swuferhong commented Dec 15, 2025

Purpose

Linked issue: close #2171

Support kv snapshot lease

Brief change log

Tests

API and Format

Documentation

@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch 2 times, most recently from 05bc3d6 to 6611274 Compare December 16, 2025 01:14
@swuferhong swuferhong changed the title [kv] Introduce the API and implement the KV snapshot consumer [kv] Support kv snapshot consumer Dec 16, 2025
@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch 5 times, most recently from a2b00c8 to 7d9f6c2 Compare December 16, 2025 04:11
@luoyuxia luoyuxia requested a review from Copilot December 16, 2025 06:01
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for KV snapshot consumers to prevent premature deletion of snapshots that are still being consumed. The implementation introduces a consumer registration mechanism where clients can register their interest in specific snapshots, preventing them from being deleted until consumption is complete or the consumer expires.

Key Changes:

  • Introduced KvSnapshotConsumer data structure to track snapshot consumption per consumer
  • Implemented KvSnapshotConsumerManager in the coordinator to manage consumer lifecycle and expiration
  • Modified snapshot retention logic to be consumer-aware rather than based on a fixed retention count
  • Added Flink source integration to automatically register/unregister consumers during checkpoint lifecycle

Reviewed changes

Copilot reviewed 65 out of 65 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
KvSnapshotConsumer.java New data structure tracking consumed snapshots per table/partition/bucket
KvSnapshotConsumerManager.java Manager for consumer registration, unregistration, and expiration checking
CompletedSnapshotStore.java Modified to use consumer-aware retention instead of fixed count
FlinkSourceEnumerator.java Registers consumers when initializing splits, unregisters on checkpoint complete
FlinkSourceReader.java Reports finished snapshot consumption via source events
ConfigOptions.java Added configuration for consumer expiration checking and default expiration time
ZooKeeperClient.java Added methods for persisting consumer state in ZooKeeper
Various test files Removed fixed snapshot retention configuration, added consumer-based tests

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch 8 times, most recently from 799ab05 to 2a9a846 Compare December 19, 2025 02:59
@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch 2 times, most recently from b53fa8a to c600bf0 Compare December 25, 2025 01:36
@swuferhong swuferhong changed the title [kv] Support kv snapshot consumer [kv] Support kv snapshot lease Dec 25, 2025
@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch 4 times, most recently from d627171 to 31f4087 Compare December 26, 2025 12:24
Copy link
Contributor

@platinumhamburg platinumhamburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong Thank you for undertaking this complex work, I've left some review comments.

public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";

// for kv snapshot lease.
public static final String KV_SNAPSHOT_LEASE_COUNT = "kvSnapshotLeaseCount";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be implemented at the table level so we can quickly identify the affected business when lease leaks occur.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was thinking the same thing. I'll add a TODO for now, and this will be implemented in the next PR since this PR is already too large. Trace by: #2297

import org.apache.flink.table.procedure.ProcedureContext;

/** Procedure to drop kv snapshot lease. */
public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto—“drop” isn’t the most appropriate verb to use with “lease.”

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to rename this API to release, or just update the comment? According to the FIP, this API is indeed named dropKvSnapshotLease, and its behavior genuinely performs a "drop" operation.

private static final int VERSION = 1;

@Override
public void serialize(KvSnapshotLeaseMetadata kvSnapshotLeaseMetadata, JsonGenerator generator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed we're using Jackson's very low-level APIs, which makes the serialization/deserialization logic quite complex. What's the reason behind this approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is indeed quite old—it was originally based on Flink's serialization. We could consider refactoring it, but perhaps not in this PR?

@swuferhong swuferhong force-pushed the kv-snapshot-consumer branch from 0a66881 to da63109 Compare January 3, 2026 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Kv Snapshot Lease

3 participants