diff --git a/dev/ci/check-suites.py b/dev/ci/check-suites.py index 279c6a89c9..b7369d1707 100644 --- a/dev/ci/check-suites.py +++ b/dev/ci/check-suites.py @@ -35,6 +35,7 @@ def file_to_class_name(path: Path) -> str | None: "org.apache.comet.parquet.ParquetReadSuite", # abstract "org.apache.comet.parquet.ParquetReadFromS3Suite", # manual test suite "org.apache.comet.IcebergReadFromS3Suite", # manual test suite + "org.apache.comet.cloud.s3.CometS3CredentialBridgeSuite", # manual test suite "org.apache.spark.sql.comet.CometPlanStabilitySuite", # abstract "org.apache.spark.sql.comet.ParquetDatetimeRebaseSuite", # abstract "org.apache.comet.exec.CometColumnarShuffleSuite" # abstract diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index f3bbfba044..0b8269e7b9 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -47,6 +47,7 @@ Benchmarking Guide Adding a New Operator Adding a New Expression Adding a New Spark Version +S3 Credential Provider Design Supported Spark Expressions Supported Spark Configurations Tracing diff --git a/docs/source/contributor-guide/s3-credential-provider-design.md b/docs/source/contributor-guide/s3-credential-provider-design.md new file mode 100644 index 0000000000..33f1b46932 --- /dev/null +++ b/docs/source/contributor-guide/s3-credential-provider-design.md @@ -0,0 +1,106 @@ + + +# S3 Credential Provider SPI: Design Notes + +This page captures why the `org.apache.comet.cloud.s3.CometS3CredentialProvider` SPI is shaped the way it is. The user-facing contract and operator setup live in the user guide page on S3 credential providers; this page is for maintainers and reviewers who want the design rationale. + +## The gap the SPI fills + +Comet's native scan paths (`object_store` for raw Parquet, `opendal` via `iceberg-rust` for Iceberg) bypass Spark's Hadoop S3A code path. That means credentials cannot flow through any of the contracts that vendors typically wire into for S3A: + +- `org.apache.spark.deploy.security.cloud.CloudCredentialsProvider` yields a single JWT per service name. No path argument, no AWS credential. +- Hadoop S3A custom signers hide path-aware logic inside `Signer.sign(request, credentials)`. The credential never leaves the signing pipeline, and the underlying secret is an HMAC key that is not present in the signed output, so running the signer against a synthesized request cannot recover it. +- `AWSCredentialsProvider.getCredentials()` (AWS SDK v1) and `AwsCredentialsProvider.resolveCredentials()` (v2) are parameterless. They cannot vend per-path credentials. +- Reflecting into vendor singletons would encode per-vendor class names and lifecycles in Comet and would silently break on vendor upgrades. + +A Comet-specific SPI is the narrowest fit: a single Java method that takes a `CometS3CredentialContext` (today wrapping `bucket`, `path`, and access `mode`; new fields can be added without breaking vendors compiled against earlier versions) and returns `CometS3Credentials`. + +## Why config-driven activation, not `META-INF/services` + +An earlier iteration used `ServiceLoader` discovery. That was rejected because: + +- Peer SPIs in the same space (Hadoop `AWSCredentialsProvider`, AWS SDK v2 `AwsCredentialsProvider`, Iceberg `AwsClientFactory`, S3A custom signers) are all class-name-in-config. Vendors are already familiar with that model. +- ServiceLoader makes activation implicit on classpath presence. A vendor JAR drifting onto a cluster could silently change S3 auth behavior. The config key makes activation explicit. +- The activation key (`fs.s3a.comet.credential.provider.class`, with per-bucket override) follows the same shape as `fs.s3a.bucket..aws.credentials.provider`, so operators do not learn a new pattern. + +Activation is modeled on `parquet.crypto.factory.class` (Parquet Modular Encryption KMS, see Comet #2447): the user names a single vendor class and the vendor dispatches across multiple credential backends inside that class if they need to. This mirrors how Iceberg's `DecryptionPropertiesFactory` already behaves for Parquet keys. + +## Why `(FQCN, dispatchKey, catalogProperties)` keying + +Comet caches one provider instance per `(FQCN, dispatchKey, catalogProperties)` triple. The dispatch key is the Spark V2 catalog name on the Iceberg path and the bucket on the Parquet path. + +- Two catalogs that share one provider class (typical in multi-tenant deployments) need isolated `initialize` maps because their `catalogProperties` differ. Without `dispatchKey`, the second `initialize` would either overwrite the first or be silently skipped. +- The bucket as `dispatchKey` for Parquet gives vendors per-bucket isolation when the same provider is named under several `fs.s3a.bucket..comet.credential.provider.class` keys. +- `catalogProperties` enters the key to handle multi-tenant JVMs (Spark Connect, Thrift Server, `SparkSession.newSession()`) where two sessions can configure the same provider class against the same `dispatchKey` but with different REST endpoints, OAuth tokens, or vendor keys. Without it the second session would silently use the first session's credentials. +- Keying solely by FQCN would force vendors to encode multi-tenant routing in static state. The triple-key keeps each call site independent. + +`ensureInitialized` returns a `long` handle that the native bridge stashes and replays on every per-request call. Routing per-request lookups by handle avoids re-sending the property bag across JNI on the hot path and unambiguously selects the right provider when the same `(FQCN, dispatchKey)` pair maps to multiple instances. + +## Why fresh construction in `initialize`, not probing a JVM-wide static + +A provider implementation might be tempted to probe an existing static populated elsewhere (e.g. by a Hadoop S3A signer's `registerStore` callback) and reuse the credential cache that the Hadoop path uses. That fails on Comet-only executors: + +- The driver JVM hits `S3AFileSystem.initialize` during analysis (raw `s3a://` paths) or during Hadoop catalog manifest reads (Iceberg with Hadoop catalog), so the static is populated there. +- The driver may not hit `S3AFileSystem` at all under Iceberg with REST catalog plus `S3FileIO`, because `S3FileIO` calls AWS SDK directly without going through the Hadoop layer. The static stays null. +- Executors with Comet-only reads never instantiate `S3AFileSystem`. The data path is `object_store` (raw Parquet) or `opendal` via `iceberg-rust` (Iceberg native scan). Neither touches Hadoop S3A. The static stays null on every executor. + +Constructing a fresh provider from `catalogProperties` plus `SparkEnv` is the only strategy that works across all four cases. The trade-off is that on the driver (and any JVM where Hadoop S3A is also active), two credential caches now exist for the same identity: one inside the Hadoop signer's provider, one inside the SPI implementation's. The vendor pays for this with a small number of extra AS round-trips on cold starts and TTL boundaries. A future optional optimization could probe the static first and reuse if non-null, falling back to fresh construction otherwise. + +## Why no Comet-side cache + +Comet's bridge does not maintain a TTL cache, schedule refresh, or broadcast catalog state. All of that is the vendor's responsibility: + +- Iceberg vendors get `software.amazon.awssdk.utils.cache.CachedSupplier` for free inside `org.apache.iceberg.aws.s3.VendedCredentialsProvider`. +- Custom-STS vendors write whatever cache fits their refresh model. +- Driver-only state (e.g. a JWT minted at plan time) is distributed via `initialize`'s `catalogProperties`, which Comet has already serialized through the native plan op for the Iceberg path. On the Parquet path vendors read from Hadoop conf via `SparkEnv`. + +A Comet-side cache would have to either expose a tuning knob (TTL, max size, eviction policy) and grow over time, or be hardcoded and surprise vendors whose policies disagree. The bridge intentionally has neither and forwards every call. + +## Path-specific behavior + +`object_store::CredentialProvider` and `reqsign_core::ProvideCredential` differ in what they consume: + +| Concern | Parquet (`object_store`) | Iceberg (`opendal` via `reqsign-core`) | +| ----------------------- | -------------------------------------------------- | ------------------------------------------------------------ | +| Trait method | `get_credential() -> AwsCredential` | `provide_credential(...) -> Option` | +| Returns expiry? | No (only key/secret/token) | Yes (`expires_in: Option`) | +| Comet-side TTL wrapper? | None. Bridge passed straight to `with_credentials` | None. `opendal` schedules the next refresh from `expires_in` | +| When SPI is called | Every `get_credential()` call | When `expires_in` is exceeded | +| Vendor returns 0 expiry | Field has no use | Bridge substitutes 5 minutes to bound staleness | + +The 5-minute fallback is a safety net so a vendor that omits expiry cannot leave `opendal` caching a stale token indefinitely. It is intentionally not a configuration knob. + +## Property-bag handling on the Iceberg path + +The full unfiltered FileIO property bag crosses JNI as `catalog_properties`. The storage-prefix filter (`s3.`/`gcs.`/`adls.`/`client.`) is applied native-side in `iceberg_scan.rs::load_file_io` immediately before `FileIOBuilder.with_prop`. This means the bridge sees `credentials.uri`, OAuth tokens, and any vendor-custom keys with no parallel field on the operator and no driver-side broadcast. Vendors set their own keys on the catalog config and read them back inside `initialize(Map)`. + +`IcebergScanExec` derives a redacting `Debug` so plan dumps and tracing do not leak the property bag. + +## Returns or throws, not a fall-through value + +The SPI returns a `CometS3Credentials` or throws. There is no sentinel "I do not know" return. Vendors that are only authoritative for some paths resolve the default AWS chain themselves for the rest and return the result. This matches the contract on every other AWS credential SPI in the JVM ecosystem (AWS SDK v1/v2, Hadoop S3A, Iceberg `VendedCredentialsProvider`). + +## Lifecycle: `AutoCloseable` plus a JVM shutdown hook + +`CometS3CredentialProvider` extends `AutoCloseable` with a default no-op `close()`. The dispatcher installs a JVM shutdown hook that iterates every cached instance and calls `close()`, swallowing per-provider exceptions so a slow or buggy vendor cannot block other providers from cleaning up. Stateless providers ignore this entirely; vendors that hold long-lived resources (HTTP clients, scheduled-refresh executors, STS connection pools) override `close()` to release them. Shutdown hooks are best-effort, so a `SIGKILL` or abrupt JVM termination skips them; vendors must not rely on `close()` for correctness, only for resource hygiene. + +## Iceberg path: error message fidelity caveat + +When the bridge is wired into `iceberg-rust`, the outer `reqsign-core::ProvideCredentialChain` currently swallows thrown exceptions into "no credential" before the request reaches `opendal`. The credential is still not issued and the request still fails, but the message is degraded to an opaque anonymous-request failure. No Comet change fixes this; it is resolved upstream when `iceberg-rust` stops wrapping custom loaders in its outer chain or moves its S3 backend to `object_store`. diff --git a/docs/source/user-guide/latest/index.rst b/docs/source/user-guide/latest/index.rst index 9587b2ee03..e284ea1df9 100644 --- a/docs/source/user-guide/latest/index.rst +++ b/docs/source/user-guide/latest/index.rst @@ -50,4 +50,5 @@ to read more. Tuning Guide Metrics Guide Iceberg Guide + S3 Credential Providers Kubernetes Guide diff --git a/docs/source/user-guide/latest/s3-credential-providers.md b/docs/source/user-guide/latest/s3-credential-providers.md new file mode 100644 index 0000000000..7a98bf3c2a --- /dev/null +++ b/docs/source/user-guide/latest/s3-credential-providers.md @@ -0,0 +1,267 @@ + + +# S3 Credential Providers + +Comet's native S3 readers normally fetch credentials from the standard AWS credential chain (static keys, instance profiles, environment variables, etc.). Some clusters use a vendor-managed mechanism instead, where credentials are issued per request based on a JWT or per S3 path. For those clusters, Comet supports loading a vendor-supplied bridge class that routes every native credential request through the vendor's Java code. + +## Do I need this? + +You don't, if any of the following describe your cluster: + +- You use static AWS credentials (`fs.s3a.access.key` / `fs.s3a.secret.key`). +- You use EC2 instance profiles, EKS pod identities, ECS task roles, or environment variables. +- Your S3 access works in Spark today via the default AWS credential chain. + +You probably do, if any of these are true: + +- You have a Hadoop S3A custom signer configured (`fs.s3a.custom.signers=...`). +- You have a Spark `CloudCredentialsProvider` that issues a JWT for a vendor STS service. +- You have a custom Iceberg `client.factory` that injects a configured S3 client. +- Spark queries against your S3 paths work, but the same queries with Comet enabled fail with 403. + +## Enabling a bridge + +A bridge is activated by naming the vendor's class in a Spark config. Putting a JAR on the classpath alone has no effect; the config key must be set. + +For raw Parquet (the `object_store` path), set the Hadoop S3A config: + +``` +spark.hadoop.fs.s3a.comet.credential.provider.class=com.vendor.MyCometCredentialProvider +``` + +A per-bucket override is supported and follows the same shape as `fs.s3a.bucket..aws.credentials.provider`: + +``` +spark.hadoop.fs.s3a.bucket..comet.credential.provider.class=com.vendor.MyCometCredentialProvider +``` + +For Iceberg (the `opendal` path), set the per-catalog property under the `s3.` namespace Iceberg already uses for its S3 settings: + +``` +spark.sql.catalog..s3.comet.credential.provider.class=com.vendor.MyCometCredentialProvider +``` + +Add the vendor JAR to your Spark executor classpath: + +```sh +spark-submit --jars vendor-comet-bridge.jar ... +``` + +OSS Comet ships no vendor-specific bridges. Get one from the same vendor that supplies your Hadoop S3A signer or Iceberg client factory. If they do not yet provide one, send them to the "Writing a bridge" section below. + +## Verification + +With the config set and the JAR on the classpath, executor logs show on first S3 access: + +- Info level: `Instantiated CometS3CredentialProvider ` +- Debug level: `Fetching credentials via (dispatchKey=) for bucket=... path=... mode=...` + +Without the config set, no credential-related log lines appear at startup; native readers use the default AWS credential chain. + +## Troubleshooting + +**`CometS3CredentialProvider class not found: `**. The class named in the config is not on the executor classpath. Re-check `--jars` / `spark.jars`. On YARN or Kubernetes, confirm the JAR actually reached the executor and not only the driver. + +**` does not implement org.apache.comet.cloud.s3.CometS3CredentialProvider`**. The configured class exists but does not implement the SPI. Double-check the FQCN against the vendor's documentation. + +**` must declare a public no-arg constructor`**. Vendor classes are instantiated reflectively with `Class.forName(name).getDeclaredConstructor().newInstance()`. A non-default constructor is not supported; ask the vendor to expose a no-arg form that reads any state it needs from `initialize(Map)` or environment. + +**`CometS3CredentialProvider (dispatchKey=...) was not initialized`**. `initialize(Map)` was not called before a credential request. Comet should always invoke `ensureInitialized` synchronously when it builds the bridge at plan time, so this indicates the bridge skipped the init call (a Comet bug) or the vendor's `initialize` threw and the bridge fell through to the default chain. + +**`403 AccessDenied` with the bridge configured.** The provider returned credentials but they were rejected by S3. Most often a region mismatch (see Iceberg section below) or expired session token; enable debug logging on the vendor's class to confirm what it returned. + +**Credentials silently going stale during long-running jobs.** When a vendor returns `expirationEpochMillis=0`, the bridge substitutes a 5-minute expiry before handing the credential to `opendal`, so `opendal`'s cache cannot hold a stale credential indefinitely. Returning a real expiry is preferred; the 5-minute fallback is a safety net, not a knob. + +## Iceberg: explicit S3 region required + +With the bridge configured, Comet wires a custom credential loader into `iceberg-storage-opendal`. `opendal`'s built-in S3 region auto-detection only runs when no custom loader is configured, so on the bridge path the region (and endpoint for non-AWS) must be set explicitly on the Spark catalog: + +``` +spark.sql.catalog..s3.region = us-east-1 +spark.sql.catalog..s3.endpoint = https://... (non-AWS only) +spark.sql.catalog..s3.path-style-access = true (path-style endpoints only) +``` + +If you hit `region is missing. Please find it by S3::detect_region() or set them in env`, this is the missing config. + +## Writing a bridge + +Comet's native scan paths (`object_store` for raw Parquet, `opendal` via `iceberg-rust` for Iceberg) bypass Hadoop S3A entirely. The standard `AWSCredentialsProvider.getCredentials()` has no path argument, so vendors that issue per-path STS credentials cannot expose them through it. The `CometS3CredentialProvider` SPI fills that gap. + +Implement `org.apache.comet.cloud.s3.CometS3CredentialProvider`: + +```java +package org.apache.comet.cloud.s3; + +public interface CometS3CredentialProvider extends AutoCloseable { + /** Called once per (FQCN, dispatchKey, catalogProperties) before any per-request call. Optional. */ + default void initialize(java.util.Map catalogProperties) {} + + CometS3Credentials getCredentialsForPath(CometS3CredentialContext context) throws Exception; + + /** Invoked from the dispatcher's JVM shutdown hook. Default is a no-op. */ + @Override default void close() throws Exception {} +} +``` + +The class must have a public no-arg constructor. `getCredentialsForPath` may be invoked concurrently from many native tokio worker threads; the implementation must be thread-safe. The supplied `CometS3CredentialContext` exposes `getBucket()`, `getPath()`, and `getMode()`; future Comet releases may add accessors here without changing the method signature, so vendors compiled against today's API stay binary-compatible. + +### Lifecycle + +Comet keys provider instances by `(FQCN, dispatchKey, catalogProperties)`. The dispatch key is the Spark V2 catalog name on the Iceberg path and the S3 bucket name on the Parquet path. The first time a given key is seen on an executor, Comet reflects the class, calls `initialize(Map)` exactly once, and caches the instance for the JVM lifetime. Two catalogs sharing one provider FQCN therefore get isolated instances with their own `initialize` maps. Including `catalogProperties` in the key matters in multi-tenant JVMs (Spark Connect, Thrift Server, `SparkSession.newSession()`) where two sessions can otherwise collide on the same `(FQCN, dispatchKey)` and have the second session silently use the first session's credentials. + +`initialize` should be cheap and non-blocking. Defer real credential fetches (REST round-trips, STS calls) to the first `getCredentialsForPath` invocation. On the Iceberg path the supplied `catalogProperties` carries the unfiltered FileIO bag, including REST-vended fields like `credentials.uri`, OAuth tokens, and any vendor-custom keys you set on the catalog config. The map may contain secrets, so do not log it. + +`close()` is invoked from a JVM shutdown hook installed by the dispatcher. The default no-op is fine for stateless providers. Override it to release HTTP clients, scheduled-refresh executors, or STS connection pools. Shutdown hooks are best-effort: a `SIGKILL` or abrupt JVM termination skips them, so do not depend on `close()` for correctness. + +### Caching, refresh, and distribution + +Comet does not maintain a TTL cache, broadcast catalog state, or schedule refresh. Vendors decide: + +- Whether to cache credentials and for how long. Iceberg vendors get `software.amazon.awssdk.utils.cache.CachedSupplier` for free inside `VendedCredentialsProvider`; vendors with custom STS write whatever cache fits. +- When to refresh: proactive timer, on-demand at expiry, on `403` retry, etc. +- How to distribute driver-only state. Read it from `initialize`'s `catalogProperties` (which Comet has already serialized through the native plan op), call back to a vendor service from the executor, or run your own Spark broadcast inside the class. + +`expirationEpochMillis` only matters on the Iceberg/`opendal` path. There the bridge implements `reqsign_core::ProvideCredential`, which carries an `expires_in` field that `opendal` uses to schedule the next refresh. Publish a real expiry when you have one. `0` means "unknown"; the bridge then substitutes a 5-minute expiry to bound staleness. + +The Parquet/`object_store` path has no expiry concept: `object_store::CredentialProvider` returns just `AwsCredential` (key/secret/token). The bridge is passed to `with_credentials` without a TTL wrapper, so `object_store` calls into the SPI on every request and relies on the vendor's own cache for hit rates. Expiry handling is fully the vendor's responsibility: the vendor decides when its internal cache refreshes. If `object_store` receives a 403 from an expired session token, its retry layer calls `get_credential()` again, giving the vendor another chance to mint fresh credentials. + +### Returned fields + +| Field | Notes | +| ----------------------- | ------------------------------------------------------------------------------------------------------------------------- | +| `accessKeyId` | Required. | +| `secretAccessKey` | Required. | +| `sessionToken` | `null` for non-STS credentials. | +| `expirationEpochMillis` | Iceberg path only. `0` means "unknown"; the bridge substitutes a 5-minute expiry. The Parquet path has no expiry concept. | + +Provide a real `expirationEpochMillis` whenever you have one on the Iceberg path. The Parquet path's `object_store::CredentialProvider` does not consume an expiry, and the bridge invokes the SPI on every `get_credential()` call. + +### Returns or throws + +The SPI follows the same shape as the other JVM AWS-credential SPIs (AWS SDK v1/v2, Hadoop S3A, Iceberg `VendedCredentialsProvider`): return credentials or throw. There is no "fall-through" return value. + +If your provider is authoritative only for some paths, resolve the default AWS chain yourself for the rest: + +```java +private final DefaultCredentialsProvider defaultChain = DefaultCredentialsProvider.create(); + +@Override +public CometS3Credentials getCredentialsForPath(CometS3CredentialContext ctx) throws Exception { + if (handlesPath(ctx.getBucket(), ctx.getPath())) { + return mintFromMyVendorService(ctx.getBucket(), ctx.getPath(), ctx.getMode()); + } + AwsCredentials c = defaultChain.resolveCredentials(); + String token = (c instanceof AwsSessionCredentials) + ? ((AwsSessionCredentials) c).sessionToken() + : null; + return new CometS3Credentials(c.accessKeyId(), c.secretAccessKey(), token, 0L); +} +``` + +### Composing multiple credential backends + +A single configured provider class is the dispatcher. If a vendor needs to route across several credential backends (per bucket, per path prefix, per tenant), the dispatch lives inside the vendor's class: + +```java +public final class MyCometCredentialProvider implements CometS3CredentialProvider { + private final ProdVendor prod = ...; + private final StsVendor sts = ...; + private final DefaultVendor fallback = ...; + + @Override + public CometS3Credentials getCredentialsForPath(CometS3CredentialContext ctx) throws Exception { + if (ctx.getBucket().startsWith("data-prod-")) return prod.fetch(ctx); + if (ctx.getBucket().equals("partner-shared")) return sts.assumeRole(ctx); + return fallback.fetch(ctx.getBucket(), ctx.getPath()); + } +} +``` + +Per-bucket Hadoop overrides (`fs.s3a.bucket..comet.credential.provider.class`) are also available if you prefer to ship multiple vendor classes and pick by bucket in config rather than in code. + +For Iceberg deployments where two catalogs share one provider class but need isolated state, configure the same FQCN on both catalogs and read your discriminator from `initialize`'s `catalogProperties`. Each catalog gets its own provider instance because Comet keys by `(FQCN, catalogName, catalogProperties)`: + +```java +public final class MyMultiTenantProvider implements CometS3CredentialProvider { + private volatile String tenantId; + + @Override + public void initialize(Map catalogProperties) { + this.tenantId = catalogProperties.get("vendor.tenant-id"); + } + + @Override + public CometS3Credentials getCredentialsForPath(CometS3CredentialContext ctx) { + return mintForTenant(tenantId, ctx.getBucket(), ctx.getPath(), ctx.getMode()); + } +} +``` + +### Reference implementation: Iceberg REST vended credentials + +For Iceberg REST catalogs that vend AWS credentials (`LoadTableResponse.credentials`), the canonical implementation wraps Iceberg's existing `VendedCredentialsProvider`: + +```java +public final class IcebergRESTVendedS3Provider implements CometS3CredentialProvider { + private volatile VendedCredentialsProvider provider; + + @Override + public void initialize(Map catalogProperties) { + this.provider = VendedCredentialsProvider.create(catalogProperties); + } + + @Override + public CometS3Credentials getCredentialsForPath(CometS3CredentialContext ctx) { + AwsCredentials c = provider.resolveCredentials(); + String token = (c instanceof AwsSessionCredentials) + ? ((AwsSessionCredentials) c).sessionToken() : null; + return new CometS3Credentials(c.accessKeyId(), c.secretAccessKey(), token, 0L); + } +} +``` + +`VendedCredentialsProvider` reads `credentials.uri`, the catalog endpoint, and OAuth tokens from the supplied map (Comet forwards the unfiltered FileIO bag to `initialize`), and refreshes through its own `CachedSupplier`. Caching, refresh-near-expiry, and the REST round-trip all live in Iceberg, not in Comet. Comet ships a copy of this class under `spark/src/test` as a reference; copy it into your runtime jar alongside `iceberg-aws` and AWS SDK v2. + +### Access mode + +| Value | Used for | +| ------- | -------------------------------------------------------------------------- | +| `READ` | All native scan paths (raw Parquet, Iceberg). Comet today only sends READ. | +| `WRITE` | Reserved for future native write paths. | + +A `WRITE` credential is not implicitly read-capable. Vendors that need read-during-write workflows include the required read permissions in the IAM policy attached to their `WRITE` credentials. + +### Build setup + +Vendor implementations need the Comet SPI classes at compile time only. Use `provided`-scope: + +```xml + + org.apache.datafusion + comet-spark-spark${spark.version.short}_${scala.binary.version} + ${comet.version} + provided + +``` + +### Iceberg path: error message fidelity + +When the bridge is wired into `iceberg-rust`, the outer `reqsign-core::ProvideCredentialChain` currently swallows thrown exceptions into "no credential" before the request reaches `opendal`. The credential is still not issued and the request still fails, only the message is degraded to an opaque anonymous-request failure. No Comet change fixes this; it is resolved upstream if `iceberg-rust` stops wrapping custom loaders in its outer chain or moves its S3 backend to `object_store`. diff --git a/native/Cargo.lock b/native/Cargo.lock index 11e9b1ccff..a02b95819c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -675,6 +675,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" dependencies = [ "aws-lc-sys", + "untrusted 0.7.1", "zeroize", ] @@ -2010,7 +2011,7 @@ dependencies = [ "object_store", "object_store_opendal", "once_cell", - "opendal 0.56.0", + "opendal", "parking_lot", "parquet", "paste", @@ -2018,6 +2019,7 @@ dependencies = [ "procfs", "prost", "rand 0.10.1", + "reqsign-core", "reqwest 0.12.28", "serde_json", "tempfile", @@ -3530,7 +3532,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", ] [[package]] @@ -3583,7 +3584,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" +source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92" dependencies = [ "aes-gcm", "anyhow", @@ -3638,7 +3639,7 @@ dependencies = [ [[package]] name = "iceberg-storage-opendal" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1ad4bfd#1ad4bfd39319508e79960d16dad1b1cdf965c5f4" +source = "git+https://github.com/apache/iceberg-rust?rev=83b4595#83b4595e2b5f522974d24d51c8ecbd09a093fa92" dependencies = [ "anyhow", "async-trait", @@ -3646,9 +3647,9 @@ dependencies = [ "cfg-if", "futures", "iceberg", - "opendal 0.55.0", - "reqsign", - "reqwest 0.12.28", + "opendal", + "reqsign-aws-v4", + "reqsign-core", "serde", "typetag", "url", @@ -4044,17 +4045,20 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.3.1" +version = "10.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +checksum = "eba32bfb4ffdeaca3e34431072faf01745c9b26d25504aa7a6cf5684334fc4fc" dependencies = [ + "aws-lc-rs", "base64", + "getrandom 0.2.17", "js-sys", "pem", - "ring", "serde", "serde_json", + "signature", "simple_asn1", + "zeroize", ] [[package]] @@ -4585,7 +4589,7 @@ dependencies = [ "futures", "mea", "object_store", - "opendal 0.56.0", + "opendal", "pin-project", "tokio", ] @@ -4614,35 +4618,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "opendal" -version = "0.55.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d075ab8a203a6ab4bc1bce0a4b9fe486a72bf8b939037f4b78d95386384bc80a" -dependencies = [ - "anyhow", - "backon", - "base64", - "bytes", - "crc32c", - "futures", - "getrandom 0.2.17", - "http 1.4.0", - "http-body 1.0.1", - "jiff", - "log", - "md-5", - "percent-encoding", - "quick-xml 0.38.4", - "reqsign", - "reqwest 0.12.28", - "serde", - "serde_json", - "tokio", - "url", - "uuid", -] - [[package]] name = "opendal" version = "0.56.0" @@ -4655,7 +4630,12 @@ dependencies = [ "opendal-layer-logging", "opendal-layer-retry", "opendal-layer-timeout", + "opendal-service-azdls", + "opendal-service-fs", + "opendal-service-gcs", "opendal-service-hdfs", + "opendal-service-oss", + "opendal-service-s3", ] [[package]] @@ -4729,6 +4709,70 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-azdls" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9884c2d8cf8ba2bb077d79c877dac5863ba3bab9e2c9c1e41a2e0491404772" +dependencies = [ + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "opendal-service-azure-common", + "quick-xml 0.38.4", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "serde_json", +] + +[[package]] +name = "opendal-service-azure-common" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb0e45d6c8dcf66ce2da20e241bcb80e6e540e109a4ff20f318f6c9b4c54e0c" +dependencies = [ + "http 1.4.0", + "opendal-core", +] + +[[package]] +name = "opendal-service-fs" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf0be0417abeeb0053376d816b90fceb9ca98f20dfb54ebf1f2a282729f83663" +dependencies = [ + "bytes", + "log", + "opendal-core", + "serde", + "tokio", + "xattr", +] + +[[package]] +name = "opendal-service-gcs" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a49477a10163431896d106136117f5670717f9c9e49cf6f710528800c6633a" +dependencies = [ + "async-trait", + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "percent-encoding", + "quick-xml 0.38.4", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-google", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "opendal-service-hdfs" version = "0.56.0" @@ -4744,6 +4788,44 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-oss" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c8a917829ad06d21b639558532cb0101fe49b040d946d673a73018683fac05" +dependencies = [ + "bytes", + "http 1.4.0", + "log", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aliyun-oss", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", +] + +[[package]] +name = "opendal-service-s3" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dadddeb9bb50b0d30927dd914c298c4ddca47e4c1cfa7674d311f0cf9b051c8" +dependencies = [ + "base64", + "bytes", + "crc32c", + "http 1.4.0", + "log", + "md-5", + "opendal-core", + "quick-xml 0.38.4", + "reqsign-aws-v4", + "reqsign-core", + "reqsign-file-read-tokio", + "serde", + "url", +] + [[package]] name = "openssl-probe" version = "0.2.1" @@ -5288,16 +5370,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "quick-xml" -version = "0.37.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "quick-xml" version = "0.38.4" @@ -5401,7 +5473,6 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ - "libc", "rand_chacha 0.3.1", "rand_core 0.6.4", ] @@ -5556,35 +5627,63 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] -name = "reqsign" -version = "0.16.5" +name = "reqsign-aliyun-oss" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" +checksum = "57ac2757f3140aa2e213b554148ae0b52733e624fc6723f0cc6bb3d440176c95" +dependencies = [ + "anyhow", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", +] + +[[package]] +name = "reqsign-aws-v4" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44eaca382e94505a49f1a4849658d153aebf79d9c1a58e5dd3b10361511e9f43" +dependencies = [ + "anyhow", + "bytes", + "form_urlencoded", + "http 1.4.0", + "log", + "percent-encoding", + "quick-xml 0.39.2", + "reqsign-core", + "rust-ini", + "serde", + "serde_json", + "serde_urlencoded", + "sha1", +] + +[[package]] +name = "reqsign-azure-storage" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a321980405d596bd34aaf95c4722a3de4128a67fd19e74a81a83aa3fdf082e6" dependencies = [ "anyhow", - "async-trait", "base64", - "chrono", + "bytes", "form_urlencoded", - "getrandom 0.2.17", - "hex", - "hmac 0.12.1", - "home", "http 1.4.0", "jsonwebtoken", "log", - "once_cell", + "pem", "percent-encoding", - "quick-xml 0.37.5", - "rand 0.8.6", - "reqwest 0.12.28", + "reqsign-core", "rsa", - "rust-ini", "serde", "serde_json", "sha1", - "sha2 0.10.9", - "tokio", ] [[package]] @@ -5609,6 +5708,37 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "reqsign-file-read-tokio" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d89295b3d17abea31851cc8de55d843d89c52132c864963c38d41920613dc5" +dependencies = [ + "anyhow", + "reqsign-core", + "tokio", +] + +[[package]] +name = "reqsign-google" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cc609b49c69e76ecaceb775a03f792d1ed3e7755ab3548d4534fd801e3242e" +dependencies = [ + "form_urlencoded", + "http 1.4.0", + "jsonwebtoken", + "log", + "percent-encoding", + "reqsign-aws-v4", + "reqsign-core", + "rsa", + "serde", + "serde_json", + "sha2 0.10.9", + "tokio", +] + [[package]] name = "reqwest" version = "0.12.28" @@ -5649,7 +5779,6 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams 0.4.2", "web-sys", - "webpki-roots", ] [[package]] @@ -5709,7 +5838,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.17", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -5880,7 +6009,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6875,6 +7004,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -7128,15 +7263,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "webpki-roots" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "which" version = "4.4.2" @@ -7564,6 +7690,16 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix 1.1.4", +] + [[package]] name = "xmlparser" version = "0.13.6" @@ -7639,6 +7775,20 @@ name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] [[package]] name = "zerotrie" diff --git a/native/Cargo.toml b/native/Cargo.toml index 9e252a796c..5df04c2ca3 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -58,8 +58,9 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.17" aws-credential-types = "1.2.13" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd" } -iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "1ad4bfd", features = ["opendal-all"] } +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595" } +iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "83b4595", features = ["opendal-all"] } +reqsign-core = "3" [profile.release] debug = true diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index c58d446917..0d3b084ba3 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -75,6 +75,7 @@ hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} opendal = { version = "0.56.0", optional = true, features = ["services-hdfs"] } iceberg = { workspace = true } iceberg-storage-opendal = { workspace = true } +reqsign-core = { workspace = true } serde_json = "1.0" uuid = "1.23.0" diff --git a/native/core/src/cloud/mod.rs b/native/core/src/cloud/mod.rs new file mode 100644 index 0000000000..634d37b26d --- /dev/null +++ b/native/core/src/cloud/mod.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Cloud-side helpers shared across Comet's native scan paths. +//! +//! Today this hosts the JNI bridge to the JVM `CometS3CredentialDispatcher` SPI, which is reused +//! by the raw Parquet (`object_store`) path and the Iceberg (`opendal` via `iceberg-rust`) path. + +pub mod s3; diff --git a/native/core/src/cloud/s3/credential_bridge.rs b/native/core/src/cloud/s3/credential_bridge.rs new file mode 100644 index 0000000000..14e4bc49d9 --- /dev/null +++ b/native/core/src/cloud/s3/credential_bridge.rs @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! JNI bridge to the JVM `CometS3CredentialDispatcher` SPI, exposed as +//! `object_store::CredentialProvider` (raw Parquet path) and `reqsign_core::ProvideCredential` +//! (Iceberg via `opendal`). See `docs/source/contributor-guide/s3-credential-provider-design.md`. + +use crate::execution::operators::ExecutionError; +use crate::jvm_bridge::{jni_new_global_ref, jni_static_call, JVMClasses}; +use async_trait::async_trait; +use iceberg_storage_opendal::AwsCredential as IcebergAwsCredential; +use jni::objects::{Global, JFieldID, JObject, JString, JValue}; +use jni::signature::{Primitive, ReturnType}; +use jni::strings::JNIString; +use jni::sys::jint; +use log::warn; +use object_store::aws::AwsCredential; +use object_store::CredentialProvider; +use once_cell::sync::OnceCell; +use reqsign_core::time::Timestamp; +use reqsign_core::{ + Context, Error as ReqsignError, ErrorKind as ReqsignErrorKind, + ProvideCredential as IcebergProvideCredential, +}; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +/// Cap on opendal's credential cache when the provider does not report an expiry. Prevents the +/// executor from holding a stale credential for the entire job lifetime. +const DEFAULT_EXPIRY_WHEN_UNKNOWN: Duration = Duration::from_secs(300); + +/// Once-per-process latch for the "missing expiry" warning. Bridges are per-scan, so a per-bridge +/// latch would re-log on every scan. +static WARNED_MISSING_EXPIRY: OnceCell<()> = OnceCell::new(); + +/// Access intent forwarded to the Java SPI. Ordinal must match the JVM `CometS3AccessMode` enum. +#[derive(Debug, Clone, Copy)] +pub enum AccessMode { + Read = 0, + #[allow(dead_code)] + Write = 1, +} + +/// Per-scan credential provider that delegates to the JVM SPI via JNI. `handle` is the JVM-side +/// identity for the `(provider_class, dispatch_key, catalog_properties)` triple returned by +/// `ensureInitialized`. `bucket_jstr` / `path_jstr` are interned once at construction to avoid +/// per-call `new_string` allocations on the hot path. +/// +/// Granularity: although the JVM SPI accepts `(bucket, path)`, neither +/// `object_store::CredentialProvider::get_credential` nor +/// `reqsign_core::ProvideCredential::provide_credential` carries a per-request path, so the +/// effective identity is per-bucket (Parquet) or per-table-location (Iceberg). +pub struct CometS3CredentialBridge { + provider_class: String, + dispatch_key: String, + bucket: String, + path: String, + mode: AccessMode, + handle: i64, + bucket_jstr: Arc>>, + path_jstr: Arc>>, +} + +impl fmt::Debug for CometS3CredentialBridge { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CometS3CredentialBridge") + .field("provider_class", &self.provider_class) + .field("dispatch_key", &self.dispatch_key) + .field("handle", &self.handle) + .field("bucket", &self.bucket) + .field("path", &self.path) + .field("mode", &self.mode) + .finish() + } +} + +impl CometS3CredentialBridge { + pub fn new( + provider_class: impl Into, + dispatch_key: impl Into, + bucket: impl Into, + path: impl Into, + mode: AccessMode, + catalog_properties: &HashMap, + ) -> Result { + let provider_class = provider_class.into(); + let dispatch_key = dispatch_key.into(); + let bucket = bucket.into(); + let path = path.into(); + + let (bucket_jstr, path_jstr) = JVMClasses::with_env(|env| -> Result<_, ExecutionError> { + let b = env + .new_string(&bucket) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(bucket): {e}")))?; + let p = env + .new_string(&path) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(path): {e}")))?; + let b_g = + Arc::new(jni_new_global_ref!(env, b).map_err(|e| { + ExecutionError::GeneralError(format!("global_ref(bucket): {e}")) + })?); + let p_g = Arc::new( + jni_new_global_ref!(env, p) + .map_err(|e| ExecutionError::GeneralError(format!("global_ref(path): {e}")))?, + ); + Ok((b_g, p_g)) + })?; + + let handle = ensure_initialized(&provider_class, &dispatch_key, catalog_properties)?; + Ok(Self { + provider_class, + dispatch_key, + bucket, + path, + mode, + handle, + bucket_jstr, + path_jstr, + }) + } + + fn fetch_raw(&self) -> Result { + JVMClasses::with_env(|env| -> Result { + let mode = self.mode as jint; + + let creds_obj: JObject = unsafe { + jni_static_call!(env, + comet_s3_credential_dispatcher.get_credentials_for_path( + self.handle, + self.bucket_jstr.as_obj(), + self.path_jstr.as_obj(), + mode + ) -> JObject + )? + }; + if creds_obj.is_null() { + return Err(ExecutionError::GeneralError( + "getCredentialsForPath returned null (contract violation)".to_string(), + )); + } + + let d = &JVMClasses::get().comet_s3_credential_dispatcher; + Ok(RawCredentials { + access_key_id: read_required_string( + env, + &creds_obj, + d.field_access_key_id, + "accessKeyId", + )?, + secret_access_key: read_required_string( + env, + &creds_obj, + d.field_secret_access_key, + "secretAccessKey", + )?, + session_token: read_optional_string(env, &creds_obj, d.field_session_token)?, + expiration_epoch_millis: unsafe { + env.get_field_unchecked( + &creds_obj, + d.field_expiration_epoch_millis, + ReturnType::Primitive(Primitive::Long), + ) + } + .and_then(|v| v.j()) + .map_err(|e| { + ExecutionError::GeneralError(format!("read expirationEpochMillis: {e}")) + })?, + }) + }) + } +} + +fn ensure_initialized( + provider_class: &str, + dispatch_key: &str, + catalog_properties: &HashMap, +) -> Result { + JVMClasses::with_env(|env| -> Result { + let provider_class_jstr = env.new_string(provider_class).map_err(|e| { + ExecutionError::GeneralError(format!("new_string(provider_class): {e}")) + })?; + let dispatch_key_jstr = env + .new_string(dispatch_key) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(dispatch_key): {e}")))?; + let props_obj = build_java_string_map(env, catalog_properties)?; + + let handle: i64 = unsafe { + jni_static_call!(env, + comet_s3_credential_dispatcher.ensure_initialized( + &provider_class_jstr, &dispatch_key_jstr, &props_obj + ) -> i64 + )? + }; + Ok(handle) + }) +} + +/// Construct a `java.util.HashMap` and populate it. Called once per bridge at +/// construction, so per-call HashMap/put cost stays off the hot path. +fn build_java_string_map<'a>( + env: &mut jni::Env<'a>, + map: &HashMap, +) -> Result, ExecutionError> { + let hashmap_class = env + .find_class(JNIString::new("java/util/HashMap")) + .map_err(|e| ExecutionError::GeneralError(format!("find_class(HashMap): {e}")))?; + let ctor = env + .get_method_id( + &hashmap_class, + jni::jni_str!(""), + jni::jni_sig!("(I)V"), + ) + .map_err(|e| ExecutionError::GeneralError(format!("HashMap.(I): {e}")))?; + let put = env + .get_method_id( + &hashmap_class, + jni::jni_str!("put"), + jni::jni_sig!("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"), + ) + .map_err(|e| ExecutionError::GeneralError(format!("HashMap.put: {e}")))?; + let initial_capacity = JValue::Int(map.len() as jint); + let instance = + unsafe { env.new_object_unchecked(&hashmap_class, ctor, &[initial_capacity.as_jni()]) } + .map_err(|e| ExecutionError::GeneralError(format!("new HashMap(int): {e}")))?; + + for (k, v) in map { + let k_jstr = env + .new_string(k) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(key): {e}")))?; + let v_jstr = env + .new_string(v) + .map_err(|e| ExecutionError::GeneralError(format!("new_string(value): {e}")))?; + let prev = unsafe { + env.call_method_unchecked( + &instance, + put, + ReturnType::Object, + &[ + JValue::Object(&k_jstr).as_jni(), + JValue::Object(&v_jstr).as_jni(), + ], + ) + } + .map_err(|e| ExecutionError::GeneralError(format!("HashMap.put call: {e}")))?; + // Discard return value; Java would have reused the existing key but our maps have no dupes. + let _ = prev.l(); + } + + Ok(instance) +} + +struct RawCredentials { + access_key_id: String, + secret_access_key: String, + session_token: Option, + /// Absolute expiry. `0` means the provider did not report one. + expiration_epoch_millis: i64, +} + +#[async_trait] +impl CredentialProvider for CometS3CredentialBridge { + type Credential = AwsCredential; + + async fn get_credential(&self) -> object_store::Result> { + let raw = self.fetch_raw().map_err(|e| object_store::Error::Generic { + store: "S3", + source: e.to_string().into(), + })?; + Ok(Arc::new(AwsCredential { + key_id: raw.access_key_id, + secret_key: raw.secret_access_key, + token: raw.session_token, + })) + } +} + +impl IcebergProvideCredential for CometS3CredentialBridge { + type Credential = IcebergAwsCredential; + + async fn provide_credential( + &self, + _ctx: &Context, + ) -> reqsign_core::Result> { + let raw = self + .fetch_raw() + .map_err(|e| ReqsignError::new(ReqsignErrorKind::CredentialInvalid, e.to_string()))?; + + let expires_in = if raw.expiration_epoch_millis > 0 { + Some( + Timestamp::from_millisecond(raw.expiration_epoch_millis).map_err(|e| { + ReqsignError::new( + ReqsignErrorKind::CredentialInvalid, + format!( + "Invalid expirationEpochMillis {}: {e}", + raw.expiration_epoch_millis + ), + ) + })?, + ) + } else { + if WARNED_MISSING_EXPIRY.set(()).is_ok() { + warn!( + "CometS3CredentialProvider returned credentials without expiration; \ + defaulting to {}s expiry to bound opendal caching", + DEFAULT_EXPIRY_WHEN_UNKNOWN.as_secs() + ); + } + Some(Timestamp::now() + DEFAULT_EXPIRY_WHEN_UNKNOWN) + }; + + Ok(Some(IcebergAwsCredential { + access_key_id: raw.access_key_id, + secret_access_key: raw.secret_access_key, + session_token: raw.session_token, + expires_in, + })) + } +} + +fn read_required_string( + env: &mut jni::Env, + instance: &JObject, + field: JFieldID, + name: &str, +) -> Result { + read_optional_string(env, instance, field)? + .ok_or_else(|| ExecutionError::GeneralError(format!("{name} was null"))) +} + +fn read_optional_string( + env: &mut jni::Env, + instance: &JObject, + field: JFieldID, +) -> Result, ExecutionError> { + let value = unsafe { env.get_field_unchecked(instance, field, ReturnType::Object) } + .and_then(|v| v.l()) + .map_err(|e| ExecutionError::GeneralError(format!("get_field_unchecked: {e}")))?; + if value.is_null() { + return Ok(None); + } + let jstr = unsafe { JString::from_raw(env, value.into_raw()) }; + jstr.try_to_string(env) + .map(Some) + .map_err(|e| ExecutionError::GeneralError(format!("try_to_string: {e}"))) +} diff --git a/native/core/src/cloud/s3/mod.rs b/native/core/src/cloud/s3/mod.rs new file mode 100644 index 0000000000..054919a691 --- /dev/null +++ b/native/core/src/cloud/s3/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod credential_bridge; diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 55bcbef349..ce1388f78f 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -40,8 +40,10 @@ use datafusion::physical_plan::{ use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::arrow::ScanMetrics; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; +use iceberg_storage_opendal::CustomAwsCredentialLoader; use iceberg_storage_opendal::OpenDalStorageFactory; +use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge}; use crate::execution::operators::ExecutionError; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; @@ -49,10 +51,13 @@ use datafusion_comet_spark_expr::EvalMode; use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; use iceberg::scan::FileScanTask; +/// Activation key for the `CometS3CredentialProvider` SPI on the Iceberg path, read from a Spark +/// catalog's `s3.*` property bag. +const ICEBERG_PROVIDER_CLASS_PROPERTY: &str = "s3.comet.credential.provider.class"; + /// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables. /// /// Executes pre-planned FileScanTasks for efficient parallel scanning. -#[derive(Debug)] pub struct IcebergScanExec { /// Iceberg table metadata location for FileIO initialization metadata_location: String, @@ -60,8 +65,13 @@ pub struct IcebergScanExec { output_schema: SchemaRef, /// Cached execution plan properties plan_properties: Arc, - /// Catalog-specific configuration for FileIO + /// Catalog-specific configuration for FileIO. Holds the unfiltered FileIO property bag, which + /// may contain OAuth tokens, REST `credentials.uri`, and other secrets the credential bridge + /// needs. Redacted in `Debug` so plan dumps and tracing do not leak credentials. catalog_properties: HashMap, + /// Spark V2 catalog name; forwarded as dispatchKey to the credential bridge. Empty when the + /// table has no catalog identity. + catalog_name: String, /// Pre-planned file scan tasks tasks: Vec, /// Number of data files to read concurrently @@ -75,6 +85,7 @@ impl IcebergScanExec { metadata_location: String, schema: SchemaRef, catalog_properties: HashMap, + catalog_name: String, tasks: Vec, data_file_concurrency_limit: usize, ) -> Result { @@ -88,6 +99,7 @@ impl IcebergScanExec { output_schema, plan_properties, catalog_properties, + catalog_name, tasks, data_file_concurrency_limit, metrics, @@ -154,7 +166,11 @@ impl IcebergScanExec { context: Arc, ) -> DFResult { let output_schema = Arc::clone(&self.output_schema); - let file_io = Self::load_file_io(&self.catalog_properties, &self.metadata_location)?; + let file_io = Self::load_file_io( + &self.catalog_properties, + &self.metadata_location, + &self.catalog_name, + )?; let batch_size = context.session_config().batch_size(); let metrics = IcebergScanMetrics::new(&self.metrics); @@ -199,7 +215,11 @@ impl IcebergScanExec { Ok(Box::pin(wrapped_stream)) } - fn storage_factory_for(path: &str) -> Result, DataFusionError> { + fn storage_factory_for( + path: &str, + catalog_properties: &HashMap, + catalog_name: &str, + ) -> Result, DataFusionError> { let scheme = if path.contains("://") { path.split("://").next().unwrap_or("file") } else { @@ -207,9 +227,13 @@ impl IcebergScanExec { }; match scheme { "file" => Ok(Arc::new(OpenDalStorageFactory::Fs)), - "s3" | "s3a" => Ok(Arc::new(OpenDalStorageFactory::S3 { - customized_credential_load: None, - })), + "s3" | "s3a" => { + let customized_credential_load = + build_s3_credential_loader(path, catalog_properties, catalog_name); + Ok(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load, + })) + } "gs" => Ok(Arc::new(OpenDalStorageFactory::Gcs)), "oss" => Ok(Arc::new(OpenDalStorageFactory::Oss)), _ => Err(DataFusionError::Execution(format!( @@ -221,18 +245,67 @@ impl IcebergScanExec { fn load_file_io( catalog_properties: &HashMap, metadata_location: &str, + catalog_name: &str, ) -> Result { - let factory = Self::storage_factory_for(metadata_location)?; + let factory = + Self::storage_factory_for(metadata_location, catalog_properties, catalog_name)?; let mut file_io_builder = FileIOBuilder::new(factory); + // Narrow to storage-prefix keys before forwarding to iceberg-rust's FileIO. The full + // unfiltered bag (catalog URI, OAuth tokens, credentials.uri, tenant-id, etc.) is kept + // upstream so CometS3CredentialBridge can read whatever the vendor needs. for (key, value) in catalog_properties { - file_io_builder = file_io_builder.with_prop(key, value); + if STORAGE_PROPERTY_PREFIXES.iter().any(|p| key.starts_with(p)) { + file_io_builder = file_io_builder.with_prop(key, value); + } } Ok(file_io_builder.build()) } } +const STORAGE_PROPERTY_PREFIXES: &[&str] = &["s3.", "gcs.", "adls.", "client."]; + +/// Wires the configured Comet credential provider into opendal's S3 service, or returns `None` +/// so opendal falls back to its default credential chain. +fn build_s3_credential_loader( + metadata_location: &str, + catalog_properties: &HashMap, + catalog_name: &str, +) -> Option { + let url = url::Url::parse(metadata_location).ok()?; + let bucket = url.host_str()?; + let provider_class = catalog_properties + .get(ICEBERG_PROVIDER_CLASS_PROPERTY) + .map(|s| s.trim()) + .filter(|s| !s.is_empty())?; + // Fall back to the bucket when the table has no catalog identity (e.g. HadoopTables loaded by + // raw path). + let dispatch_key: &str = if catalog_name.is_empty() { + bucket + } else { + catalog_name + }; + let bridge = CometS3CredentialBridge::new( + provider_class, + dispatch_key, + bucket, + url.path(), + AccessMode::Read, + catalog_properties, + ); + match bridge { + Ok(b) => Some(CustomAwsCredentialLoader::new(b)), + Err(e) => { + log::warn!( + "Failed to initialize CometS3CredentialBridge for {provider_class}: {e}; \ + falling back to default opendal credential chain" + ); + None + } + } +} + /// Metrics for IcebergScanExec struct IcebergScanMetrics { /// Baseline metrics (output rows, elapsed compute time) @@ -361,6 +434,38 @@ impl DisplayAs for IcebergScanExec { } } +impl fmt::Debug for IcebergScanExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IcebergScanExec") + .field("metadata_location", &self.metadata_location) + .field("catalog_name", &self.catalog_name) + .field( + "catalog_properties", + &RedactedProperties(&self.catalog_properties), + ) + .field("num_tasks", &self.tasks.len()) + .field( + "data_file_concurrency_limit", + &self.data_file_concurrency_limit, + ) + .finish_non_exhaustive() + } +} + +/// Wraps a property map so `Debug` shows keys but elides values, since the unfiltered FileIO bag +/// may contain bearer tokens and OAuth secrets. +struct RedactedProperties<'a>(&'a HashMap); + +impl fmt::Debug for RedactedProperties<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut m = f.debug_map(); + for k in self.0.keys() { + m.key(k).value(&""); + } + m.finish() + } +} + /// Build projection expressions that adapt batches from a file schema to the target schema. /// /// The returned expressions can be cached and reused across multiple batches @@ -397,7 +502,7 @@ fn adapt_batch_with_expressions( return Ok(batch); } - // Zero-column projection (e.g. SELECT count(*)) — preserve row count + // Zero-column projection (e.g. SELECT count(*)), preserve row count if projection_exprs.is_empty() { return Ok(RecordBatch::try_new_with_options( Arc::clone(target_schema), diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 89d118b059..88f24584cc 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1483,6 +1483,7 @@ impl PhysicalPlanner { .map(|(k, v)| (k.clone(), v.clone())) .collect(); let metadata_location = common.metadata_location.clone(); + let catalog_name = common.catalog_name.clone(); let tasks = parse_file_scan_tasks_from_common(common, &scan.file_scan_tasks)?; let data_file_concurrency_limit = common.data_file_concurrency_limit as usize; @@ -1490,6 +1491,7 @@ impl PhysicalPlanner { metadata_location, required_schema, catalog_properties, + catalog_name, tasks, data_file_concurrency_limit, )?; diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index f4ae0b8834..19a2d774a0 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -65,6 +65,7 @@ pub mod jvm_bridge { use errors::{try_unwrap_or_throw, CometError, CometResult}; +pub mod cloud; pub mod execution; pub mod parquet; // this module is for non release only. Intended for debugging/profiling purposes diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 806800cd59..c17c24a0dd 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -24,7 +24,7 @@ pub mod schema_adapter; pub mod util; mod cast_column; -mod objectstore; +pub(crate) mod objectstore; use std::collections::HashMap; use std::task::Poll; diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index a427ad8ad5..f9c5f7a885 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::OnceLock; use url::Url; +use crate::cloud::s3::credential_bridge::{AccessMode, CometS3CredentialBridge}; use crate::execution::jni_api::get_runtime; use async_trait::async_trait; use aws_config::{ @@ -78,11 +79,32 @@ pub fn create_store( source: "Missing bucket name in S3 URL".into(), })?; - let credential_provider = - get_runtime().block_on(build_credential_provider(configs, bucket, min_ttl))?; - builder = match credential_provider { - Some(provider) => builder.with_credentials(Arc::new(provider)), - None => builder.with_skip_signature(true), + // Parquet path: catalog_properties is empty; vendors here read from Hadoop conf. + let empty_props: HashMap = HashMap::new(); + builder = match lookup_provider_class(configs, bucket) { + Some(provider_class) => { + // Fail rather than fall back to the default chain, which could resolve to the wrong + // identity for a user who explicitly named a provider. + let bridge = CometS3CredentialBridge::new( + provider_class, + bucket, + bucket, + url.path(), + AccessMode::Read, + &empty_props, + ) + .map_err(|e| object_store::Error::Generic { + store: "S3", + source: format!("CometS3CredentialBridge init failed for {bucket}: {e}").into(), + })?; + builder.with_credentials(Arc::new(bridge)) + } + None => { + match get_runtime().block_on(build_credential_provider(configs, bucket, min_ttl))? { + Some(provider) => builder.with_credentials(Arc::new(provider)), + None => builder.with_skip_signature(true), + } + } }; let s3_configs = extract_s3_config_options(configs, bucket); @@ -287,7 +309,7 @@ fn get_config<'a>( }) } -fn get_config_trimmed<'a>( +pub(super) fn get_config_trimmed<'a>( configs: &'a HashMap, bucket: &str, property: &str, @@ -295,6 +317,17 @@ fn get_config_trimmed<'a>( get_config(configs, bucket, property).map(|s| s.trim()) } +/// Activation key (without `fs.s3a.` prefix) naming the vendor `CometS3CredentialProvider` FQCN. +/// Per-bucket override is honored via [`get_config_trimmed`]. +const PROVIDER_CLASS_PROPERTY: &str = "comet.credential.provider.class"; + +fn lookup_provider_class<'a>( + configs: &'a HashMap, + bucket: &str, +) -> Option<&'a str> { + get_config_trimmed(configs, bucket, PROVIDER_CLASS_PROPERTY).filter(|s| !s.is_empty()) +} + // Hadoop S3A credential provider constants const HADOOP_IAM_INSTANCE: &str = "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider"; const HADOOP_SIMPLE: &str = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"; diff --git a/native/jni-bridge/src/comet_s3_credential_dispatcher.rs b/native/jni-bridge/src/comet_s3_credential_dispatcher.rs new file mode 100644 index 0000000000..b38ca0348a --- /dev/null +++ b/native/jni-bridge/src/comet_s3_credential_dispatcher.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use jni::{ + errors::Result as JniResult, + objects::{JClass, JFieldID, JStaticMethodID}, + signature::{Primitive, ReturnType}, + strings::JNIString, + Env, +}; + +/// JNI handles for the JVM `CometS3CredentialDispatcher` SPI plus the `CometS3Credentials` POJO +/// whose fields the native bridge reads back. +pub struct CometS3CredentialDispatcher<'a> { + pub class: JClass<'a>, + /// Retained so the cached POJO `JFieldID`s stay alive for the executor lifetime. + #[allow(dead_code)] + pub credentials_class: JClass<'a>, + pub method_ensure_initialized: JStaticMethodID, + pub method_ensure_initialized_ret: ReturnType, + pub method_get_credentials_for_path: JStaticMethodID, + pub method_get_credentials_for_path_ret: ReturnType, + pub field_access_key_id: JFieldID, + pub field_secret_access_key: JFieldID, + pub field_session_token: JFieldID, + pub field_expiration_epoch_millis: JFieldID, +} + +impl<'a> CometS3CredentialDispatcher<'a> { + pub const JVM_CLASS: &'static str = "org/apache/comet/cloud/s3/CometS3CredentialDispatcher"; + pub const CREDENTIALS_CLASS: &'static str = "org/apache/comet/cloud/s3/CometS3Credentials"; + + pub fn new(env: &mut Env<'a>) -> JniResult> { + let class = env.find_class(JNIString::new(Self::JVM_CLASS))?; + let credentials_class = env.find_class(JNIString::new(Self::CREDENTIALS_CLASS))?; + + Ok(CometS3CredentialDispatcher { + method_ensure_initialized: env.get_static_method_id( + JNIString::new(Self::JVM_CLASS), + jni::jni_str!("ensureInitialized"), + jni::jni_sig!("(Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;)J"), + )?, + method_ensure_initialized_ret: ReturnType::Primitive(Primitive::Long), + method_get_credentials_for_path: env.get_static_method_id( + JNIString::new(Self::JVM_CLASS), + jni::jni_str!("getCredentialsForPath"), + jni::jni_sig!( + "(JLjava/lang/String;Ljava/lang/String;I)Lorg/apache/comet/cloud/s3/CometS3Credentials;" + ), + )?, + method_get_credentials_for_path_ret: ReturnType::Object, + field_access_key_id: env.get_field_id( + &credentials_class, + jni::jni_str!("accessKeyId"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_secret_access_key: env.get_field_id( + &credentials_class, + jni::jni_str!("secretAccessKey"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_session_token: env.get_field_id( + &credentials_class, + jni::jni_str!("sessionToken"), + jni::jni_sig!("Ljava/lang/String;"), + )?, + field_expiration_epoch_millis: env.get_field_id( + &credentials_class, + jni::jni_str!("expirationEpochMillis"), + jni::jni_sig!("J"), + )?, + class, + credentials_class, + }) + } +} diff --git a/native/jni-bridge/src/lib.rs b/native/jni-bridge/src/lib.rs index d72323c961..490e80d076 100644 --- a/native/jni-bridge/src/lib.rs +++ b/native/jni-bridge/src/lib.rs @@ -191,12 +191,14 @@ mod comet_exec; pub use comet_exec::*; mod batch_iterator; mod comet_metric_node; +mod comet_s3_credential_dispatcher; mod comet_task_memory_manager; mod comet_udf_bridge; mod shuffle_block_iterator; use batch_iterator::CometBatchIterator; pub use comet_metric_node::*; +pub use comet_s3_credential_dispatcher::CometS3CredentialDispatcher; pub use comet_task_memory_manager::*; use comet_udf_bridge::CometUdfBridge; use shuffle_block_iterator::CometShuffleBlockIterator; @@ -233,6 +235,8 @@ pub struct JVMClasses<'a> { /// The CometUdfBridge class used to dispatch JVM scalar UDFs. /// `None` if the class is not on the classpath. pub comet_udf_bridge: Option>, + /// JNI handles for the CometS3CredentialDispatcher SPI and the CometS3Credentials POJO. + pub comet_s3_credential_dispatcher: CometS3CredentialDispatcher<'a>, } unsafe impl Send for JVMClasses<'_> {} @@ -310,6 +314,7 @@ impl JVMClasses<'_> { } bridge }, + comet_s3_credential_dispatcher: CometS3CredentialDispatcher::new(env).unwrap(), } }); } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index ed1684b240..7f50aa928c 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -205,6 +205,12 @@ message IcebergScanCommon { // Number of data files to read concurrently within a single task uint32 data_file_concurrency_limit = 12; + + // Spark V2 catalog name that loaded this table. Forwarded as the dispatchKey to + // CometS3CredentialDispatcher.ensureInitialized so two catalogs sharing one provider class get + // isolated provider instances. Empty string when the table has no catalog identity (e.g. + // HadoopTables loaded by raw path). + string catalog_name = 13; } message IcebergScan { diff --git a/pom.xml b/pom.xml index 7419fecc92..dbd439717f 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ under the License. 4.13.6 2.0.17 33.2.1-jre - 1.21.0 + 1.21.4 2.31.51 ${project.basedir}/../native/target/debug darwin @@ -476,6 +476,23 @@ under the License. ${testcontainers.version} test + + + com.github.docker-java + docker-java-api + 3.7.1 + test + + + com.github.docker-java + docker-java-transport-zerodep + 3.7.1 + test +