From b6c66a1c676860e873d9ec079cdc998518a4049a Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 10 Feb 2026 18:05:56 +0800 Subject: [PATCH 1/2] [feature](maxcompute)support maxcompute ram_role_arn and ecs_ram_role. --- be/src/runtime/descriptors.cpp | 9 +++ be/src/runtime/descriptors.h | 10 ++- .../format/table/max_compute_jni_reader.cpp | 34 ++++---- .../maxcompute/MaxComputeJniScanner.java | 10 +-- fe/fe-common/pom.xml | 14 ++++ .../common/maxcompute}/MCProperties.java | 19 ++--- .../doris/common/maxcompute/MCUtils.java | 78 +++++++++++++++++++ .../maxcompute/MaxComputeExternalCatalog.java | 33 ++------ .../maxcompute/MaxComputeExternalTable.java | 8 +- .../maxcompute/source/MaxComputeScanNode.java | 2 +- gensrc/thrift/Descriptors.thrift | 5 +- 11 files changed, 149 insertions(+), 73 deletions(-) rename fe/{fe-core/src/main/java/org/apache/doris/datasource/property/constants => fe-common/src/main/java/org/apache/doris/common/maxcompute}/MCProperties.java (90%) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCUtils.java diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 2acbd4d0af173a..3847609d433a9e 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -310,6 +310,15 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde _init_status = Status::InvalidArgument("fail to init MaxComputeTableDescriptor, missing quota."); } + + if (tdesc.mcTable.__isset.properties) [[likely]] { + _props = tdesc.mcTable.properties; + } else { + static const std::string MC_ACCESS_KEY = "mc.access_key"; + static const std::string MC_SECRET_KEY = "mc.secret_key"; + _props.insert({MC_ACCESS_KEY, _access_key}); + _props.insert({MC_SECRET_KEY, _secret_key}); + } } MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index f93750de325e66..3b6b0405c9556b 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -245,18 +245,20 @@ class MaxComputeTableDescriptor : public TableDescriptor { std::string endpoint() const { return _endpoint; } std::string quota() const { return _quota; } Status init_status() const { return _init_status; } + std::map properties() const { return _props; } private: std::string _region; //deprecated std::string _project; std::string _table; - std::string _odps_url; //deprecated - std::string _tunnel_url; //deprecated - std::string _access_key; - std::string _secret_key; + std::string _odps_url; //deprecated + std::string _tunnel_url; //deprecated + std::string _access_key; //deprecated + std::string _secret_key; //deprecated std::string _public_access; //deprecated std::string _endpoint; std::string _quota; + std::map _props; Status _init_status = Status::OK(); }; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index 81999f896173a9..4e167dc9d4015b 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -62,27 +62,27 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des } index++; } - std::map params = { - {"access_key", _table_desc->access_key()}, - {"secret_key", _table_desc->secret_key()}, - {"endpoint", _table_desc->endpoint()}, - {"quota", _table_desc->quota()}, - {"project", _table_desc->project()}, - {"table", _table_desc->table()}, - {"session_id", _max_compute_params.session_id}, - {"scan_serializer", _max_compute_params.table_batch_read_session}, + auto properties = _table_desc->properties(); + properties["endpoint"] = _table_desc->endpoint(); + properties["quota"] = _table_desc->quota(); + properties["project"] = _table_desc->project(); + properties["table"] = _table_desc->table(); - {"start_offset", std::to_string(_range.start_offset)}, - {"split_size", std::to_string(_range.size)}, - {"required_fields", required_fields.str()}, - {"columns_types", columns_types.str()}, + properties["session_id"] = _max_compute_params.session_id; + properties["scan_serializer"] = _max_compute_params.table_batch_read_session; + + properties["start_offset"] = std::to_string(_range.start_offset); + properties["split_size"] = std::to_string(_range.size); + properties["required_fields"] = required_fields.str(); + properties["columns_types"] = columns_types.str(); + + properties["connect_timeout"] = std::to_string(_max_compute_params.connect_timeout); + properties["read_timeout"] = std::to_string(_max_compute_params.read_timeout); + properties["retry_count"] = std::to_string(_max_compute_params.retry_times); - {"connect_timeout", std::to_string(_max_compute_params.connect_timeout)}, - {"read_timeout", std::to_string(_max_compute_params.read_timeout)}, - {"retry_count", std::to_string(_max_compute_params.retry_times)}}; _jni_connector = std::make_unique( - "org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names); + "org/apache/doris/maxcompute/MaxComputeJniScanner", properties, column_names); } Status MaxComputeJniReader::init_reader() { diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 4128d66f09d0fc..0b196fcb9cd72a 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -19,10 +19,9 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.maxcompute.MCUtils; import com.aliyun.odps.Odps; -import com.aliyun.odps.account.Account; -import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.table.configuration.CompressionCodec; import com.aliyun.odps.table.configuration.ReaderOptions; import com.aliyun.odps.table.configuration.RestOptions; @@ -120,8 +119,6 @@ public MaxComputeJniScanner(int batchSize, Map params) { } } - String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'."); - String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'."); String endpoint = Objects.requireNonNull(params.get(ENDPOINT), "required property '" + ENDPOINT + "'."); String quota = Objects.requireNonNull(params.get(QUOTA), "required property '" + QUOTA + "'."); String scanSerializer = Objects.requireNonNull(params.get(SCAN_SERIALIZER), @@ -137,10 +134,7 @@ public MaxComputeJniScanner(int batchSize, Map params) { timeZone = ZoneId.systemDefault(); } - - Account account = new AliyunAccount(accessKey, secretKey); - Odps odps = new Odps(account); - + Odps odps = MCUtils.createMcClient(params); odps.setDefaultProject(project); odps.setEndpoint(endpoint); diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 838b3e97881e03..b49e2d7289558c 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -108,6 +108,20 @@ under the License. org.apache.logging.log4j log4j-iostreams + + com.aliyun.odps + odps-sdk-core + + + org.apache.arrow + arrow-vector + + + org.ini4j + ini4j + + + doris-fe-common diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java similarity index 90% rename from fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java rename to fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java index 70feb48f2f7707..627f3bc03e2f82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java @@ -15,16 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.property.constants; - -import org.apache.doris.common.credentials.CloudCredential; - -import java.util.Map; +package org.apache.doris.common.maxcompute; /** * properties for aliyun max compute */ -public class MCProperties extends BaseProperties { +public class MCProperties { //To be compatible with previous versions of the catalog. public static final String REGION = "mc.region"; @@ -99,7 +95,12 @@ public class MCProperties extends BaseProperties { public static final String ENABLE_NAMESPACE_SCHEMA = "mc.enable.namespace.schema"; public static final String DEFAULT_ENABLE_NAMESPACE_SCHEMA = "false"; - public static CloudCredential getCredential(Map props) { - return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); - } + public static final String AUTH_TYPE = "mc.auth.type"; + public static final String AUTH_TYPE_AK_SK = "ak_sk"; + public static final String AUTH_TYPE_RAM_ROLE_ARN = "ram_role_arn"; + public static final String AUTH_TYPE_ECS_RAM_ROLE = "ecs_ram_role"; + public static final String DEFAULT_AUTH_TYPE = AUTH_TYPE_AK_SK; + + public static final String RAM_ROLE_ARN = "mc.ram_role_arn"; + public static final String ECS_RAM_ROLE = "mc.ecs_ram_role"; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCUtils.java new file mode 100644 index 00000000000000..fc7f47fc2689a8 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCUtils.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.maxcompute; + +import com.aliyun.auth.credentials.Credential; +import com.aliyun.auth.credentials.provider.EcsRamRoleCredentialProvider; +import com.aliyun.auth.credentials.provider.RamRoleArnCredentialProvider; +import com.aliyun.odps.Odps; +import com.aliyun.odps.account.Account; +import com.aliyun.odps.account.AklessAccount; +import com.aliyun.odps.account.AliyunAccount; + +import java.util.Map; + +public class MCUtils { + public static void checkAuthProperties(Map properties) { + String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE); + if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) { + if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY)) { + throw new RuntimeException("Missing access key or secret key for AK/SK auth type"); + } + } else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) { + if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY) + || !properties.containsKey(MCProperties.RAM_ROLE_ARN)) { + throw new RuntimeException("Missing access key, secret key or role arn for RAM Role ARN auth type"); + } + } else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) { + if (!properties.containsKey(MCProperties.ECS_RAM_ROLE)) { + throw new RuntimeException("Missing role name for ECS RAM Role auth type"); + } + } else { + throw new RuntimeException("Unsupported auth type: " + authType); + } + } + + public static Odps createMcClient(Map properties) { + String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE); + if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) { + String accessKey = properties.get(MCProperties.ACCESS_KEY); + String secretKey = properties.get(MCProperties.SECRET_KEY); + Account account = new AliyunAccount(accessKey, secretKey); + return new Odps(account); + } else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) { + String accessKey = properties.get(MCProperties.ACCESS_KEY); + String secretKey = properties.get(MCProperties.SECRET_KEY); + String roleArn = properties.get(MCProperties.RAM_ROLE_ARN); + RamRoleArnCredentialProvider ramRoleArnCredentialProvider = + RamRoleArnCredentialProvider.builder().credential( + Credential.builder().accessKeyId(accessKey) + .accessKeySecret(secretKey).build()) + .roleArn(roleArn).build(); + AklessAccount aklessAccount = new AklessAccount(ramRoleArnCredentialProvider); + return new Odps(aklessAccount); + } else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) { + String roleName = properties.get(MCProperties.ECS_RAM_ROLE); + EcsRamRoleCredentialProvider credentialProvider = EcsRamRoleCredentialProvider.create(roleName); + AklessAccount aklessAccount = new AklessAccount(credentialProvider); + return new Odps(aklessAccount); + } else { + throw new RuntimeException("Unsupported auth type: " + authType); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index 65af7f967fe849..edf339e6702830 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -19,18 +19,16 @@ import org.apache.doris.common.DdlException; -import org.apache.doris.common.credentials.CloudCredential; +import org.apache.doris.common.maxcompute.MCProperties; +import org.apache.doris.common.maxcompute.MCUtils; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; -import org.apache.doris.datasource.property.constants.MCProperties; import com.aliyun.odps.Odps; import com.aliyun.odps.Partition; -import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AccountFormat; -import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.table.TableIdentifier; import com.aliyun.odps.table.configuration.RestOptions; import com.aliyun.odps.table.configuration.SplitOptions; @@ -54,9 +52,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { // you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api"; + private Map props; private Odps odps; - private String accessKey; - private String secretKey; private String endpoint; private String defaultProject; private String quota; @@ -158,7 +155,7 @@ protected void generatorEndpoint() { @Override protected void initLocalObjectsImpl() { - Map props = catalogProperty.getProperties(); + props = catalogProperty.getProperties(); generatorEndpoint(); @@ -198,16 +195,11 @@ protected void initLocalObjectsImpl() { .withReadTimeout(readTimeout) .withRetryTimes(retryTimes).build(); - CloudCredential credential = MCProperties.getCredential(props); - accessKey = credential.getAccessKey(); - secretKey = credential.getSecretKey(); - dateTimePredicatePushDown = Boolean.parseBoolean( props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN, MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN)); - Account account = new AliyunAccount(accessKey, secretKey); - this.odps = new Odps(account); + odps = MCUtils.createMcClient(props); odps.setDefaultProject(defaultProject); odps.setEndpoint(endpoint); @@ -288,14 +280,9 @@ public List listTableNames(SessionContext ctx, String dbName) { return mcStructureHelper.listTableNames(getClient(), dbName); } - public String getAccessKey() { - makeSureInitialized(); - return accessKey; - } - - public String getSecretKey() { + public Map getProperties() { makeSureInitialized(); - return secretKey; + return props; } public String getEndpoint() { @@ -449,10 +436,6 @@ public void checkProperties() throws DdlException { + MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer"); } - CloudCredential credential = MCProperties.getCredential(props); - if (!credential.isWhole()) { - throw new DdlException("Max-Compute credential properties '" - + MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required."); - } + MCUtils.checkAuthProperties(props); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 09f052f2cea235..d7724b1c7e0181 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -322,17 +322,11 @@ public TTableDescriptor toThrift() { TMCTable tMcTable = new TMCTable(); MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog); - tMcTable.setAccessKey(mcCatalog.getAccessKey()); - tMcTable.setSecretKey(mcCatalog.getSecretKey()); - tMcTable.setOdpsUrl("deprecated"); - tMcTable.setRegion("deprecated"); + tMcTable.setProperties(mcCatalog.getProperties()); tMcTable.setEndpoint(mcCatalog.getEndpoint()); // use mc project as dbName tMcTable.setProject(dbName); tMcTable.setQuota(mcCatalog.getQuota()); - - tMcTable.setTunnelUrl("deprecated"); - tMcTable.setProject("deprecated"); tMcTable.setTable(name); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE, schema.size(), 0, getName(), dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index dc6ced6ee0113a..502a2c645cdd41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -33,13 +33,13 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.common.maxcompute.MCProperties; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType; -import org.apache.doris.datasource.property.constants.MCProperties; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.util.DateUtils; import org.apache.doris.planner.PlanNodeId; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 411af7b54820b5..f7a36b08ef3a71 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -437,13 +437,14 @@ struct TMCTable { 1: optional string region // deprecated 2: optional string project 3: optional string table - 4: optional string access_key - 5: optional string secret_key + 4: optional string access_key // deprecated + 5: optional string secret_key // deprecated 6: optional string public_access // deprecated 7: optional string odps_url // deprecated 8: optional string tunnel_url // deprecated 9: optional string endpoint 10: optional string quota + 11: optional map properties // contains authentication properties } struct TTrinoConnectorTable { From 9f096ca6e5a313cee5fb1e5060b8983ffc02c5cc Mon Sep 17 00:00:00 2001 From: daidai Date: Sun, 1 Mar 2026 22:46:58 +0800 Subject: [PATCH 2/2] fix build --- .../main/java/org/apache/doris/common/util/PrintableMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java index 784432ba9fe8cc..0f7d67c3454666 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java @@ -17,8 +17,8 @@ package org.apache.doris.common.util; +import org.apache.doris.common.maxcompute.MCProperties; import org.apache.doris.datasource.property.ConnectorPropertiesUtils; -import org.apache.doris.datasource.property.constants.MCProperties; import org.apache.doris.datasource.property.metastore.AWSGlueMetaStoreBaseProperties; import org.apache.doris.datasource.property.metastore.AliyunDLFBaseProperties; import org.apache.doris.datasource.property.storage.AzureProperties;