diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 0ac000359d..d004b4ea0e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -51,11 +51,14 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; +import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.security.acl.AccessControlEntry; @@ -1370,6 +1373,221 @@ void testTableExistsAuthorization() throws Exception { rootAdmin.dropTable(testTablePath, true).get(); } + @Test + void testRemoteLogAndTieringAuthorization() throws Exception { + // These RPCs are internal-only, so we test via direct gateway access + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway guestTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + rpcClient, + TabletServerGateway.class); + + CoordinatorGateway guestCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // Test 1: notifyRemoteLogOffsets without WRITE permission + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); + assertThatThrownBy( + () -> + guestTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 2: commitRemoteLogManifest without WRITE permission + CommitRemoteLogManifestRequest commitManifestRequest = + new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); + assertThatThrownBy( + () -> + guestCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 3: lakeTieringHeartbeat without WRITE permission + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + assertThatThrownBy( + () -> + guestCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // Test 4: Grant CLUSTER/WRITE permission and verify operations succeed + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + try (RpcClient authorizedRpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway authorizedTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + authorizedRpcClient, + TabletServerGateway.class); + + CoordinatorGateway authorizedCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + authorizedRpcClient, + CoordinatorGateway.class); + + // Test notifyRemoteLogOffsets with permission + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); + Throwable thrown1 = + catchThrowable( + () -> + authorizedTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()); + if (thrown1 != null) { + assertThat(thrown1).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test commitRemoteLogManifest with permission + CommitRemoteLogManifestRequest commitManifestRequest = + new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); + Throwable thrown2 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()); + if (thrown2 != null) { + assertThat(thrown2).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Test lakeTieringHeartbeat with permission + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + Throwable thrown3 = + catchThrowable( + () -> + authorizedCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()); + if (thrown3 != null) { + assertThat(thrown3).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + + // Test 5: Verify internal sessions bypass authorization + TabletServerGateway internalTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + TabletServerGateway.class); + + CoordinatorGateway internalCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("FLUSS"), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + CoordinatorGateway.class); + + // Internal connections should NOT throw AuthorizationException + NotifyRemoteLogOffsetsRequest notifyRemoteRequest = new NotifyRemoteLogOffsetsRequest(); + notifyRemoteRequest.setTableId(1L); + notifyRemoteRequest.setBucketId(0); + notifyRemoteRequest.setCoordinatorEpoch(1); + notifyRemoteRequest.setRemoteStartOffset(0L); + notifyRemoteRequest.setRemoteEndOffset(100L); + Throwable thrown4 = + catchThrowable( + () -> + internalTabletGateway + .notifyRemoteLogOffsets(notifyRemoteRequest) + .get()); + if (thrown4 != null) { + assertThat(thrown4).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + CommitRemoteLogManifestRequest commitManifestRequest = new CommitRemoteLogManifestRequest(); + commitManifestRequest.setTableId(1L); + commitManifestRequest.setBucketId(0); + commitManifestRequest.setRemoteLogManifestPath("/path/to/manifest"); + commitManifestRequest.setRemoteLogStartOffset(0L); + commitManifestRequest.setRemoteLogEndOffset(100L); + commitManifestRequest.setCoordinatorEpoch(1); + commitManifestRequest.setBucketLeaderEpoch(1); + Throwable thrown5 = + catchThrowable( + () -> + internalCoordinatorGateway + .commitRemoteLogManifest(commitManifestRequest) + .get()); + if (thrown5 != null) { + assertThat(thrown5).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + LakeTieringHeartbeatRequest heartbeatRequest = new LakeTieringHeartbeatRequest(); + Throwable thrown6 = + catchThrowable( + () -> + internalCoordinatorGateway + .lakeTieringHeartbeat(heartbeatRequest) + .get()); + if (thrown6 != null) { + assertThat(thrown6).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + // Cleanup + rootAdmin.dropAcls(Collections.singletonList(AclBindingFilter.ANY)).all().get(); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..7031eff00b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -198,6 +198,7 @@ import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; +import static org.apache.fluss.security.acl.OperationType.WRITE; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; @@ -787,6 +788,9 @@ public CompletableFuture commitKvSnapshot( @Override public CompletableFuture commitRemoteLogManifest( CommitRemoteLogManifestRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -882,6 +886,9 @@ public CompletableFuture commitLakeTableSnapsho @Override public CompletableFuture lakeTieringHeartbeat( LakeTieringHeartbeatRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } LakeTieringHeartbeatResponse heartbeatResponse = new LakeTieringHeartbeatResponse(); int currentCoordinatorEpoch = coordinatorEpochSupplier.get(); heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 88731daaba..dd853af5e4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -423,6 +423,9 @@ public CompletableFuture initWriter(InitWriterRequest reques @Override public CompletableFuture notifyRemoteLogOffsets( NotifyRemoteLogOffsetsRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.notifyRemoteLogOffsets( getNotifyRemoteLogOffsetsData(request), response::complete);