From 6ad9087eeefceada7a0f3605d44fc8b352148791 Mon Sep 17 00:00:00 2001 From: Chenjunwei <138805230+xylaaaaa@users.noreply.github.com> Date: Fri, 27 Feb 2026 23:25:11 +0800 Subject: [PATCH] [feature](storage) Add OzoneProperties to support Apache Ozone (#60809) ## Proposed changes Add dedicated `OzoneProperties` to support Apache Ozone S3 Gateway as an explicit storage backend. --- .../doris/common/util/LocationPath.java | 4 + .../property/storage/OzoneProperties.java | 153 +++++++++++++++ .../property/storage/StorageProperties.java | 6 +- .../property/storage/OzonePropertiesTest.java | 183 ++++++++++++++++++ 4 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OzoneProperties.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 9bb64d09737c3e..1ce6576635de05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -313,6 +313,10 @@ private static StorageProperties findStorageProperties(StorageProperties.Type ty && storagePropertiesMap.containsKey(StorageProperties.Type.MINIO)) { return storagePropertiesMap.get(StorageProperties.Type.MINIO); } + if (type == StorageProperties.Type.S3 + && storagePropertiesMap.containsKey(StorageProperties.Type.OZONE)) { + return storagePropertiesMap.get(StorageProperties.Type.OZONE); + } // Step 3: Compatibility fallback based on schema // In previous configurations, the schema name may not strictly match the actual storage type. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OzoneProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OzoneProperties.java new file mode 100644 index 00000000000000..24d1079b998df9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OzoneProperties.java @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.storage; + +import org.apache.doris.datasource.property.ConnectorProperty; + +import com.google.common.collect.ImmutableSet; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.StringUtils; + +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +public class OzoneProperties extends AbstractS3CompatibleProperties { + + @Setter + @Getter + @ConnectorProperty(names = {"ozone.endpoint", "s3.endpoint"}, + required = false, + description = "The endpoint of Ozone S3 Gateway.") + protected String endpoint = ""; + + @Setter + @Getter + @ConnectorProperty(names = {"ozone.region", "s3.region"}, + required = false, + description = "The region of Ozone S3 Gateway.") + protected String region = "us-east-1"; + + @Getter + @ConnectorProperty(names = {"ozone.access_key", "s3.access_key", "s3.access-key-id"}, + required = false, + sensitive = true, + description = "The access key of Ozone S3 Gateway.") + protected String accessKey = ""; + + @Getter + @ConnectorProperty(names = {"ozone.secret_key", "s3.secret_key", "s3.secret-access-key"}, + required = false, + sensitive = true, + description = "The secret key of Ozone S3 Gateway.") + protected String secretKey = ""; + + @Getter + @ConnectorProperty(names = {"ozone.session_token", "s3.session_token", "s3.session-token"}, + required = false, + sensitive = true, + description = "The session token of Ozone S3 Gateway.") + protected String sessionToken = ""; + + @Getter + @ConnectorProperty(names = {"ozone.connection.maximum", "s3.connection.maximum"}, + required = false, + description = "Maximum number of connections.") + protected String maxConnections = "100"; + + @Getter + @ConnectorProperty(names = {"ozone.connection.request.timeout", "s3.connection.request.timeout"}, + required = false, + description = "Request timeout in seconds.") + protected String requestTimeoutS = "10000"; + + @Getter + @ConnectorProperty(names = {"ozone.connection.timeout", "s3.connection.timeout"}, + required = false, + description = "Connection timeout in seconds.") + protected String connectionTimeoutS = "10000"; + + @Setter + @Getter + @ConnectorProperty(names = {"ozone.use_path_style", "use_path_style", "s3.path-style-access"}, + required = false, + description = "Whether to use path style URL for the storage.") + protected String usePathStyle = "true"; + + @Setter + @Getter + @ConnectorProperty(names = {"ozone.force_parsing_by_standard_uri", "force_parsing_by_standard_uri"}, + required = false, + description = "Whether to use path style URL for the storage.") + protected String forceParsingByStandardUrl = "false"; + + protected OzoneProperties(Map origProps) { + super(Type.OZONE, origProps); + } + + @Override + public void initNormalizeAndCheckProps() { + hydrateFromOriginalProps(); + super.initNormalizeAndCheckProps(); + hydrateFromOriginalProps(); + } + + private void hydrateFromOriginalProps() { + endpoint = StringUtils.firstNonBlank( + endpoint, + origProps.get("ozone.endpoint"), + origProps.get("s3.endpoint")); + region = StringUtils.firstNonBlank(region, origProps.get("ozone.region"), origProps.get("s3.region")); + accessKey = StringUtils.firstNonBlank( + accessKey, + origProps.get("ozone.access_key"), + origProps.get("s3.access_key"), + origProps.get("s3.access-key-id")); + secretKey = StringUtils.firstNonBlank( + secretKey, + origProps.get("ozone.secret_key"), + origProps.get("s3.secret_key"), + origProps.get("s3.secret-access-key")); + sessionToken = StringUtils.firstNonBlank(sessionToken, origProps.get("ozone.session_token"), + origProps.get("s3.session_token"), origProps.get("s3.session-token")); + usePathStyle = StringUtils.firstNonBlank(usePathStyle, origProps.get("ozone.use_path_style"), + origProps.get("use_path_style"), origProps.get("s3.path-style-access")); + forceParsingByStandardUrl = StringUtils.firstNonBlank(forceParsingByStandardUrl, + origProps.get("ozone.force_parsing_by_standard_uri"), + origProps.get("force_parsing_by_standard_uri")); + } + + @Override + protected Set endpointPatterns() { + return ImmutableSet.of(Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$")); + } + + @Override + protected void setEndpointIfPossible() { + super.setEndpointIfPossible(); + if (StringUtils.isBlank(getEndpoint())) { + throw new IllegalArgumentException("Property ozone.endpoint is required."); + } + } + + @Override + protected Set schemas() { + return ImmutableSet.of("s3", "s3a", "s3n"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java index 5de749f45d664c..0464dabbc67446 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java @@ -42,6 +42,7 @@ public abstract class StorageProperties extends ConnectionProperties { public static final String FS_S3_SUPPORT = "fs.s3.support"; public static final String FS_GCS_SUPPORT = "fs.gcs.support"; public static final String FS_MINIO_SUPPORT = "fs.minio.support"; + public static final String FS_OZONE_SUPPORT = "fs.ozone.support"; public static final String FS_BROKER_SUPPORT = "fs.broker.support"; public static final String FS_AZURE_SUPPORT = "fs.azure.support"; public static final String FS_OSS_SUPPORT = "fs.oss.support"; @@ -67,6 +68,7 @@ public enum Type { GCS, OSS_HDFS, MINIO, + OZONE, AZURE, BROKER, LOCAL, @@ -203,7 +205,9 @@ public static StorageProperties createPrimary(Map origProps) { props -> (isFsSupport(props, FS_AZURE_SUPPORT) || AzureProperties.guessIsMe(props)) ? new AzureProperties(props) : null, props -> (isFsSupport(props, FS_MINIO_SUPPORT) - || MinioProperties.guessIsMe(props)) ? new MinioProperties(props) : null, + || (!isFsSupport(props, FS_OZONE_SUPPORT) + && MinioProperties.guessIsMe(props))) ? new MinioProperties(props) : null, + props -> isFsSupport(props, FS_OZONE_SUPPORT) ? new OzoneProperties(props) : null, props -> (isFsSupport(props, FS_BROKER_SUPPORT) || BrokerProperties.guessIsMe(props)) ? new BrokerProperties(props) : null, props -> (isFsSupport(props, FS_LOCAL_SUPPORT) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java new file mode 100644 index 00000000000000..fe6701f9163e61 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OzonePropertiesTest.java @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.storage; + +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class OzonePropertiesTest { + private Map origProps; + + @BeforeEach + public void setup() { + origProps = new HashMap<>(); + } + + @Test + public void testValidOzoneConfiguration() { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("ozone.endpoint", "http://ozone-s3g:9878"); + origProps.put("ozone.access_key", "hadoop"); + origProps.put("ozone.secret_key", "hadoop"); + + OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps); + Map backendProps = ozoneProperties.getBackendConfigProperties(); + + Assertions.assertEquals(StorageProperties.Type.OZONE, ozoneProperties.getType()); + Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint()); + Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey()); + Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey()); + Assertions.assertEquals("us-east-1", ozoneProperties.getRegion()); + Assertions.assertEquals("true", ozoneProperties.getUsePathStyle()); + + Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT")); + Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY")); + Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY")); + Assertions.assertEquals("us-east-1", backendProps.get("AWS_REGION")); + Assertions.assertEquals("true", backendProps.get("use_path_style")); + } + + @Test + public void testS3PropertiesBinding() { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("s3.endpoint", "http://ozone-s3g:9878"); + origProps.put("s3.access_key", "hadoop"); + origProps.put("s3.secret_key", "hadoop"); + origProps.put("use_path_style", "true"); + origProps.put("s3.region", "us-east-1"); + + OzoneProperties ozoneProperties = (OzoneProperties) StorageProperties.createPrimary(origProps); + Map backendProps = ozoneProperties.getBackendConfigProperties(); + + Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getEndpoint()); + Assertions.assertEquals("hadoop", ozoneProperties.getAccessKey()); + Assertions.assertEquals("hadoop", ozoneProperties.getSecretKey()); + Assertions.assertEquals("true", ozoneProperties.getUsePathStyle()); + + Assertions.assertEquals("http://ozone-s3g:9878", backendProps.get("AWS_ENDPOINT")); + Assertions.assertEquals("hadoop", backendProps.get("AWS_ACCESS_KEY")); + Assertions.assertEquals("hadoop", backendProps.get("AWS_SECRET_KEY")); + } + + @Test + public void testFsS3aPropertiesAreNotSupported() { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("fs.s3a.endpoint", "http://ozone-s3g:9878"); + origProps.put("fs.s3a.access.key", "hadoop"); + origProps.put("fs.s3a.secret.key", "hadoop"); + + ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class, + "Property ozone.endpoint is required.", + () -> StorageProperties.createPrimary(origProps)); + } + + @Test + public void testCreateAllWithDefaultFs() throws UserException { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("fs.defaultFS", "s3a://dn-data/"); + origProps.put("s3.endpoint", "http://ozone-s3g:9878"); + origProps.put("s3.access_key", "hadoop"); + origProps.put("s3.secret_key", "hadoop"); + origProps.put("use_path_style", "true"); + + List properties = StorageProperties.createAll(origProps); + Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass()); + Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass()); + + Map propertiesMap = properties.stream() + .collect(Collectors.toMap(StorageProperties::getType, Function.identity())); + LocationPath locationPath = LocationPath.of("s3a://dn-data/warehouse/test_table", propertiesMap); + Assertions.assertTrue(locationPath.getStorageProperties() instanceof OzoneProperties); + } + + @Test + public void testCreateAllWithDefaultFsAndOzoneProperties() throws UserException { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("fs.defaultFS", "s3a://dn-data/"); + origProps.put("ozone.endpoint", "http://ozone-s3g:9878"); + origProps.put("ozone.access_key", "hadoop"); + origProps.put("ozone.secret_key", "hadoop"); + origProps.put("ozone.use_path_style", "true"); + origProps.put("ozone.region", "us-east-1"); + + List properties = StorageProperties.createAll(origProps); + Assertions.assertEquals(HdfsProperties.class, properties.get(0).getClass()); + Assertions.assertEquals(OzoneProperties.class, properties.get(1).getClass()); + + OzoneProperties ozoneProperties = (OzoneProperties) properties.get(1); + Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.access.key")); + Assertions.assertEquals("hadoop", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.secret.key")); + Assertions.assertEquals("http://ozone-s3g:9878", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint")); + Assertions.assertEquals("us-east-1", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.endpoint.region")); + Assertions.assertEquals("true", ozoneProperties.getHadoopStorageConfig().get("fs.s3a.path.style.access")); + } + + @Test + public void testMissingAccessKeyOrSecretKey() { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("ozone.endpoint", "http://ozone-s3g:9878"); + origProps.put("ozone.access_key", "hadoop"); + ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class, + "Both the access key and the secret key must be set.", + () -> StorageProperties.createPrimary(origProps)); + + origProps.remove("ozone.access_key"); + origProps.put("ozone.secret_key", "hadoop"); + ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class, + "Both the access key and the secret key must be set.", + () -> StorageProperties.createPrimary(origProps)); + } + + @Test + public void testMissingEndpoint() { + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + origProps.put("ozone.access_key", "hadoop"); + origProps.put("ozone.secret_key", "hadoop"); + ExceptionChecker.expectThrowsWithMsg(IllegalArgumentException.class, + "Property ozone.endpoint is required.", + () -> StorageProperties.createPrimary(origProps)); + } + + @Test + public void testRequireExplicitFsOzoneSupport() throws UserException { + origProps.put("ozone.endpoint", "http://127.0.0.1:9878"); + origProps.put("ozone.access_key", "hadoop"); + origProps.put("ozone.secret_key", "hadoop"); + + List propertiesWithoutFlag = StorageProperties.createAll(origProps); + Assertions.assertEquals(1, propertiesWithoutFlag.size()); + Assertions.assertEquals(HdfsProperties.class, propertiesWithoutFlag.get(0).getClass()); + + origProps.put(StorageProperties.FS_OZONE_SUPPORT, "true"); + List propertiesWithFlag = StorageProperties.createAll(origProps); + Assertions.assertEquals(2, propertiesWithFlag.size()); + Assertions.assertEquals(HdfsProperties.class, propertiesWithFlag.get(0).getClass()); + Assertions.assertEquals(OzoneProperties.class, propertiesWithFlag.get(1).getClass()); + } +}