From 488f56b9dc3250996a504ee4cc76e8cd65d39780 Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 16:28:45 +0530 Subject: [PATCH 1/3] [server] Add authorization to Remote Log & Tiering RPCs This commit adds CLUSTER/WRITE authorization checks for remote log and tiering internal RPCs as specified in issue #3251: Server Changes: - TabletService: Add authorization to notifyRemoteLogOffsets - CoordinatorService: Add authorization to commitRemoteLogManifest and lakeTieringHeartbeat - All methods check CLUSTER/WRITE permission before processing requests - Internal sessions automatically bypass authorization via session.isInternal() Test Coverage: - Add comprehensive test testRemoteLogAndTieringAuthorization() - Test 1-3: Verify AuthorizationException when client lacks CLUSTER/WRITE permission - Test 4: Grant CLUSTER/WRITE permission and verify operations succeed - Test 5: Verify internal server-to-server calls bypass authorization - Tests all 3 remote log/tiering operations with complete authorization lifecycle These are internal server-to-server RPCs used for: - notifyRemoteLogOffsets: Notify TabletServers about remote log tier offsets - commitRemoteLogManifest: Commit remote log manifests to CoordinatorServer - lakeTieringHeartbeat: Lake tiering service heartbeats to CoordinatorServer The authorization prevents external clients from calling internal cluster management APIs while allowing legitimate internal operations to proceed. Co-Authored-By: Claude Sonnet 4.5 --- .../acl/FlussAuthorizationITCase.java | 170 ++++++++++++++++++ .../coordinator/CoordinatorService.java | 7 + .../fluss/server/tablet/TabletService.java | 3 + 3 files changed, 180 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 0ac000359d..73915dbe3c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -51,11 +51,14 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; +import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.security.acl.AccessControlEntry; @@ -1370,6 +1373,173 @@ void testTableExistsAuthorization() throws Exception { rootAdmin.dropTable(testTablePath, true).get(); } + @Test + void testRemoteLogAndTieringAuthorization() throws Exception { + // These RPCs are internal-only, so we test via direct gateway access + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway guestTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + rpcClient, + TabletServerGateway.class); + + CoordinatorGateway guestCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // Test 1: notifyRemoteLogOffsets without WRITE permission + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = + new NotifyRemoteLogOffsetsRequest(); + assertThatThrownBy( + () -> guestTabletGateway.notifyRemoteLogOffsets(notifyRemoteRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 2: commitRemoteLogManifest without WRITE permission + CommitRemoteLogManifestRequest commitManifestRequest = + new CommitRemoteLogManifestRequest(); + assertThatThrownBy( + () -> + guestCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 3: lakeTieringHeartbeat without WRITE permission + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + assertThatThrownBy( + () -> guestCoordinatorGateway.lakeTieringHeartbeat(heartbeatRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // Test 4: Grant CLUSTER/WRITE permission and verify operations succeed + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + try (RpcClient authorizedRpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway authorizedTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + authorizedRpcClient, + TabletServerGateway.class); + + CoordinatorGateway authorizedCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + authorizedRpcClient, + CoordinatorGateway.class); + + // Test notifyRemoteLogOffsets with permission + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = + new NotifyRemoteLogOffsetsRequest(); + Throwable thrown1 = + catchThrowable( + () -> + authorizedTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()); + if (thrown1 != null) { + assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitRemoteLogManifest with permission + CommitRemoteLogManifestRequest commitManifestRequest = + new CommitRemoteLogManifestRequest(); + Throwable thrown2 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()); + if (thrown2 != null) { + assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test lakeTieringHeartbeat with permission + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + Throwable thrown3 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()); + if (thrown3 != null) { + assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + + // Test 5: Verify internal sessions bypass authorization + TabletServerGateway internalTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + TabletServerGateway.class); + + CoordinatorGateway internalCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + CoordinatorGateway.class); + + // Internal connections should NOT throw AuthorizationException + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + Throwable thrown4 = + catchThrowable( + () -> internalTabletGateway.notifyRemoteLogOffsets(notifyRemoteRequest).get()); + if (thrown4 != null) { + assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitRemoteLogManifestRequest commitManifestRequest = + new CommitRemoteLogManifestRequest(); + Throwable thrown5 = + catchThrowable( + () -> + internalCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()); + if (thrown5 != null) { + assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + Throwable thrown6 = + catchThrowable( + () -> internalCoordinatorGateway.lakeTieringHeartbeat(heartbeatRequest).get()); + if (thrown6 != null) { + assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Cleanup + rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..7031eff00b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -198,6 +198,7 @@ import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; +import static org.apache.fluss.security.acl.OperationType.WRITE; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; @@ -787,6 +788,9 @@ public CompletableFuture commitKvSnapshot( @Override public CompletableFuture commitRemoteLogManifest( CommitRemoteLogManifestRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -882,6 +886,9 @@ public CompletableFuture commitLakeTableSnapsho @Override public CompletableFuture lakeTieringHeartbeat( LakeTieringHeartbeatRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } LakeTieringHeartbeatResponse heartbeatResponse = new LakeTieringHeartbeatResponse(); int currentCoordinatorEpoch = coordinatorEpochSupplier.get(); heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 88731daaba..dd853af5e4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -423,6 +423,9 @@ public CompletableFuture initWriter(InitWriterRequest reques @Override public CompletableFuture notifyRemoteLogOffsets( NotifyRemoteLogOffsetsRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyRemoteLogOffsets( getNotifyRemoteLogOffsetsData(request), response::complete); From ad8ae9d46500a96089ec3a7b8f6bfca85390daaa Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 16:42:52 +0530 Subject: [PATCH 2/3] spotless fix --- .../acl/FlussAuthorizationITCase.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 73915dbe3c..041161142e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1392,10 +1392,12 @@ void testRemoteLogAndTieringAuthorization() throws Exception { CoordinatorGateway.class); // Test 1: notifyRemoteLogOffsets without WRITE permission - NotifyRemoteLogOffsetsRequest notifyRemoteRequest = - new NotifyRemoteLogOffsetsRequest(); + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); assertThatThrownBy( - () -> guestTabletGateway.notifyRemoteLogOffsets(notifyRemoteRequest).get()) + () -> + guestTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1421,7 +1423,10 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Test 3: lakeTieringHeartbeat without WRITE permission LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); assertThatThrownBy( - () -> guestCoordinatorGateway.lakeTieringHeartbeat(heartbeatRequest).get()) + () -> + guestCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()) .rootCause() .isInstanceOf(AuthorizationException.class) .hasMessageContaining( @@ -1436,7 +1441,10 @@ void testRemoteLogAndTieringAuthorization() throws Exception { new AclBinding( Resource.cluster(), new AccessControlEntry( - guestPrincipal, "*", OperationType.WRITE, PermissionType.ALLOW))); + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW))); rootAdmin.createAcls(aclBindings).all().get(); FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); @@ -1456,8 +1464,7 @@ void testRemoteLogAndTieringAuthorization() throws Exception { CoordinatorGateway.class); // Test notifyRemoteLogOffsets with permission - NotifyRemoteLogOffsetsRequest notifyRemoteRequest = - new NotifyRemoteLogOffsetsRequest(); + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); Throwable thrown1 = catchThrowable( () -> @@ -1511,13 +1518,15 @@ void testRemoteLogAndTieringAuthorization() throws Exception { NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); Throwable thrown4 = catchThrowable( - () -> internalTabletGateway.notifyRemoteLogOffsets(notifyRemoteRequest).get()); + () -> + internalTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()); if (thrown4 != null) { assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class); } - CommitRemoteLogManifestRequest commitManifestRequest = - new CommitRemoteLogManifestRequest(); + CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest(); Throwable thrown5 = catchThrowable( () -> @@ -1531,7 +1540,10 @@ void testRemoteLogAndTieringAuthorization() throws Exception { LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); Throwable thrown6 = catchThrowable( - () -> internalCoordinatorGateway.lakeTieringHeartbeat(heartbeatRequest).get()); + () -> + internalCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()); if (thrown6 != null) { assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class); } From 7ae5b1283efb7c00aef888395eb83f73ef08e5a1 Mon Sep 17 00:00:00 2001 From: vaibhav kumar Date: Mon, 11 May 2026 19:45:55 +0530 Subject: [PATCH 3/3] [test] Fix remote log and tiering test - add required fields to request objects The test was failing because Protocol Buffer request objects require certain fields to be set before validation. Added required fields to all remote log and tiering RPC requests: - NotifyRemoteLogOffsetsRequest: tableId, bucketId, coordinatorEpoch, remoteStartOffset, remoteEndOffset - CommitRemoteLogManifestRequest: tableId, bucketId, remoteLogManifestPath, remoteLogStartOffset, remoteLogEndOffset, coordinatorEpoch, bucketLeaderEpoch - LakeTieringHeartbeatRequest: (no required fields) This ensures requests pass validation and reach the authorization check. --- .../acl/FlussAuthorizationITCase.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 041161142e..d004b4ea0e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -1393,6 +1393,11 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Test 1: notifyRemoteLogOffsets without WRITE permission NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); assertThatThrownBy( () -> guestTabletGateway @@ -1408,6 +1413,13 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Test 2: commitRemoteLogManifest without WRITE permission CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); assertThatThrownBy( () -> guestCoordinatorGateway @@ -1465,6 +1477,11 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Test notifyRemoteLogOffsets with permission NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); Throwable thrown1 = catchThrowable( () -> @@ -1478,6 +1495,13 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Test commitRemoteLogManifest with permission CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); Throwable thrown2 = catchThrowable( () -> @@ -1516,6 +1540,11 @@ void testRemoteLogAndTieringAuthorization() throws Exception { // Internal connections should NOT throw AuthorizationException NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); Throwable thrown4 = catchThrowable( () -> @@ -1527,6 +1556,13 @@ void testRemoteLogAndTieringAuthorization() throws Exception { } CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); Throwable thrown5 = catchThrowable( () ->