From 18f3bc6645580fb5f9cddf1cb21782e8b094b542 Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 16:19:50 +0530 Subject: [PATCH 1/3] [server] Add authorization to snapshot management RPCs This commit adds CLUSTER/WRITE authorization checks for snapshot management internal RPCs as specified in issue #3250: Server Changes: - TabletService: Add authorization to notifyKvSnapshotOffset and notifyLakeTableOffset - CoordinatorService: Add authorization to commitKvSnapshot and commitLakeTableSnapshot - All methods check CLUSTER/WRITE permission before processing requests - Internal sessions automatically bypass authorization via session.isInternal() Test Coverage: - Add comprehensive test testSnapshotManagementAuthorization() - Test 1-4: Verify AuthorizationException when client lacks CLUSTER/WRITE permission - Test 5: Grant CLUSTER/WRITE permission and verify operations succeed - Test 6: Verify internal server-to-server calls bypass authorization - Tests all 4 snapshot management operations with complete authorization lifecycle These are internal server-to-server RPCs used for snapshot coordination. The authorization prevents external clients from calling internal APIs while allowing legitimate internal cluster operations to proceed. Co-Authored-By: Claude Sonnet 4.5 --- .../acl/FlussAuthorizationITCase.java | 182 ++++++++++++++++++ .../coordinator/CoordinatorService.java | 7 + .../fluss/server/tablet/TabletService.java | 6 + 3 files changed, 195 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 0ac000359d..76785d0cc9 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -51,11 +51,15 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; +import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest; +import org.apache.fluss.rpc.messages.NotifyLakeTableOffsetRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.security.acl.AccessControlEntry; @@ -1370,6 +1374,184 @@ void testTableExistsAuthorization() throws Exception { rootAdmin.dropTable(testTablePath, true).get(); } + @Test + void testSnapshotManagementAuthorization() throws Exception { + // These RPCs are internal-only, so we test via direct gateway access + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway guestTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + rpcClient, + TabletServerGateway.class); + + CoordinatorGateway guestCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // Test 1: notifyKvSnapshotOffset without WRITE permission + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + assertThatThrownBy(() -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 2: notifyLakeTableOffset without WRITE permission + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + assertThatThrownBy(() -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 3: commitKvSnapshot without WRITE permission + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + assertThatThrownBy(() -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 4: commitLakeTableSnapshot without WRITE permission + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + assertThatThrownBy( + () -> guestCoordinatorGateway.commitLakeTableSnapshot(commitLakeRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // Test 5: Grant CLUSTER/WRITE permission and verify operations succeed + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + try (RpcClient authorizedRpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway authorizedTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + authorizedRpcClient, + TabletServerGateway.class); + + CoordinatorGateway authorizedCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + authorizedRpcClient, + CoordinatorGateway.class); + + // Test notifyKvSnapshotOffset with permission + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + Throwable thrown1 = + catchThrowable( + () -> authorizedTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()); + if (thrown1 != null) { + assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test notifyLakeTableOffset with permission + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + Throwable thrown2 = + catchThrowable( + () -> authorizedTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()); + if (thrown2 != null) { + assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitKvSnapshot with permission + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + Throwable thrown3 = + catchThrowable( + () -> authorizedCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()); + if (thrown3 != null) { + assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitLakeTableSnapshot with permission + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + Throwable thrown4 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()); + if (thrown4 != null) { + assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + + // Test 6: Verify internal sessions bypass authorization + TabletServerGateway internalTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + TabletServerGateway.class); + + CoordinatorGateway internalCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + CoordinatorGateway.class); + + // Internal connections should NOT throw AuthorizationException + NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + Throwable thrown5 = + catchThrowable( + () -> internalTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()); + if (thrown5 != null) { + assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + Throwable thrown6 = + catchThrowable( + () -> internalTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()); + if (thrown6 != null) { + assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + Throwable thrown7 = + catchThrowable( + () -> internalCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()); + if (thrown7 != null) { + assertThat(thrown7).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); + Throwable thrown8 = + catchThrowable( + () -> + internalCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()); + if (thrown8 != null) { + assertThat(thrown8).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Cleanup + rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..4a1dbb1ca2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -198,6 +198,7 @@ import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; +import static org.apache.fluss.security.acl.OperationType.WRITE; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; @@ -770,6 +771,9 @@ public CompletableFuture adjustIsr(AdjustIsrRequest request) @Override public CompletableFuture commitKvSnapshot( CommitKvSnapshotRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); // parse completed snapshot from request byte[] completedSnapshotBytes = request.getCompletedSnapshot(); @@ -870,6 +874,9 @@ public CompletableFuture prepareLakeTableSnaps @Override public CompletableFuture commitLakeTableSnapshot( CommitLakeTableSnapshotRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 88731daaba..dc83172491 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -432,6 +432,9 @@ public CompletableFuture notifyRemoteLogOffsets( @Override public CompletableFuture notifyKvSnapshotOffset( NotifyKvSnapshotOffsetRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyKvSnapshotOffset( getNotifySnapshotOffsetData(request), response::complete); @@ -441,6 +444,9 @@ public CompletableFuture notifyKvSnapshotOffset( @Override public CompletableFuture notifyLakeTableOffset( NotifyLakeTableOffsetRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyLakeTableOffset(getNotifyLakeTableOffset(request), response::complete); return response; From 7d0db7972cd403522bb610a724e3e975f54094e0 Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 16:57:16 +0530 Subject: [PATCH 2/3] spotless fix --- .../acl/FlussAuthorizationITCase.java | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 76785d0cc9..d46305d7c3 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1394,7 +1394,8 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 1: notifyKvSnapshotOffset without WRITE permission NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); - assertThatThrownBy(() -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()) + assertThatThrownBy( + () -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1404,7 +1405,8 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 2: notifyLakeTableOffset without WRITE permission NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); - assertThatThrownBy(() -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()) + assertThatThrownBy( + () -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1414,7 +1416,8 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 3: commitKvSnapshot without WRITE permission CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); - assertThatThrownBy(() -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()) + assertThatThrownBy( + () -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1425,7 +1428,10 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 4: commitLakeTableSnapshot without WRITE permission CommitLakeTableSnapshotRequest commitLakeRequest = new CommitLakeTableSnapshotRequest(); assertThatThrownBy( - () -> guestCoordinatorGateway.commitLakeTableSnapshot(commitLakeRequest).get()) + () -> + guestCoordinatorGateway + .commitLakeTableSnapshot(commitLakeRequest) + .get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1440,7 +1446,10 @@ void testSnapshotManagementAuthorization() throws Exception { new AclBinding( Resource.cluster(), new AccessControlEntry( - guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW))); + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW))); rootAdmin.createAcls(aclBindings).all().get(); FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); @@ -1463,7 +1472,10 @@ void testSnapshotManagementAuthorization() throws Exception { NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); Throwable thrown1 = catchThrowable( - () -> authorizedTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()); + () -> + authorizedTabletGateway + .notifyKvSnapshotOffset(notifyKvRequest) + .get()); if (thrown1 != null) { assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class); } @@ -1472,7 +1484,10 @@ void testSnapshotManagementAuthorization() throws Exception { NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); Throwable thrown2 = catchThrowable( - () -> authorizedTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()); + () -> + authorizedTabletGateway + .notifyLakeTableOffset(notifyLakeRequest) + .get()); if (thrown2 != null) { assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class); } @@ -1481,7 +1496,10 @@ void testSnapshotManagementAuthorization() throws Exception { CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); Throwable thrown3 = catchThrowable( - () -> authorizedCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()); + () -> + authorizedCoordinatorGateway + .commitKvSnapshot(commitKvRequest) + .get()); if (thrown3 != null) { assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class); } From bfa65dbe561cb2e97334d92b555c9dcb6c755a0c Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 19:36:22 +0530 Subject: [PATCH 3/3] [test] Fix snapshot management test - add required fields to request objects The test was failing because Protocol Buffer request objects require certain fields to be set before validation. Added required fields to all snapshot management RPC requests: - NotifyKvSnapshotOffsetRequest: tableId, bucketId, coordinatorEpoch, minRetainOffset - NotifyLakeTableOffsetRequest: coordinatorEpoch - CommitKvSnapshotRequest: completedSnapshot, coordinatorEpoch, bucketLeaderEpoch - CommitLakeTableSnapshotRequest: (no required fields, but keeping consistent) This ensures requests pass validation and reach the authorization check. --- .../acl/FlussAuthorizationITCase.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index d46305d7c3..4ee3b82617 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1394,6 +1394,10 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 1: notifyKvSnapshotOffset without WRITE permission NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); assertThatThrownBy( () -> guestTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()) .rootCause() @@ -1405,6 +1409,7 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 2: notifyLakeTableOffset without WRITE permission NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); assertThatThrownBy( () -> guestTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()) .rootCause() @@ -1416,6 +1421,9 @@ void testSnapshotManagementAuthorization() throws Exception { // Test 3: commitKvSnapshot without WRITE permission CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); assertThatThrownBy( () -> guestCoordinatorGateway.commitKvSnapshot(commitKvRequest).get()) .rootCause() @@ -1470,6 +1478,10 @@ void testSnapshotManagementAuthorization() throws Exception { // Test notifyKvSnapshotOffset with permission NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); Throwable thrown1 = catchThrowable( () -> @@ -1482,6 +1494,7 @@ void testSnapshotManagementAuthorization() throws Exception { // Test notifyLakeTableOffset with permission NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); Throwable thrown2 = catchThrowable( () -> @@ -1494,6 +1507,9 @@ void testSnapshotManagementAuthorization() throws Exception { // Test commitKvSnapshot with permission CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); Throwable thrown3 = catchThrowable( () -> @@ -1532,6 +1548,10 @@ void testSnapshotManagementAuthorization() throws Exception { // Internal connections should NOT throw AuthorizationException NotifyKvSnapshotOffsetRequest notifyKvRequest = new NotifyKvSnapshotOffsetRequest(); + notifyKvRequest.setTableId(1L); + notifyKvRequest.setBucketId(0); + notifyKvRequest.setCoordinatorEpoch(1); + notifyKvRequest.setMinRetainOffset(0L); Throwable thrown5 = catchThrowable( () -> internalTabletGateway.notifyKvSnapshotOffset(notifyKvRequest).get()); @@ -1540,6 +1560,7 @@ void testSnapshotManagementAuthorization() throws Exception { } NotifyLakeTableOffsetRequest notifyLakeRequest = new NotifyLakeTableOffsetRequest(); + notifyLakeRequest.setCoordinatorEpoch(1); Throwable thrown6 = catchThrowable( () -> internalTabletGateway.notifyLakeTableOffset(notifyLakeRequest).get()); @@ -1548,6 +1569,9 @@ void testSnapshotManagementAuthorization() throws Exception { } CommitKvSnapshotRequest commitKvRequest = new CommitKvSnapshotRequest(); + commitKvRequest.setCompletedSnapshot(new byte[0]); + commitKvRequest.setCoordinatorEpoch(1); + commitKvRequest.setBucketLeaderEpoch(1); Throwable thrown7 = catchThrowable( () -> internalCoordinatorGateway.commitKvSnapshot(commitKvRequest).get());