Skip to content

feat: Credential provider support#4335

Open
karuppayya wants to merge 2 commits into
apache:mainfrom
karuppayya:COMET-4332
Open

feat: Credential provider support#4335
karuppayya wants to merge 2 commits into
apache:mainfrom
karuppayya:COMET-4332

Conversation

@karuppayya
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4332

Rationale for this change

Comet's native Iceberg scan only supports static AWS credentials captured at plan time, so jobs running longer than the session-token lifetime will fail mid-scan.
Iceberg-rust already exposes a pluggability hook — Comet just wasn't using it.
This PR adds a CometCredentialProvider that the native scan calls back into via JNI when a refresh is needed.

What changes are included in this PR?

  • New interface for Credential provider
  • Plumbing for Native to call into JVM via JNI to fetch credentials when needed

How are these changes tested?

  • Unit tests added.
  • Tested e2e by deploying to a Spark cluster

@karuppayya
Copy link
Copy Markdown
Contributor Author

cc: @mbutrovich @parthchandra @andygrove
This follows a model similar to PME support in Comet.
(While this chnages touches number of files, most are plumbing and tests. Appreaiate extra attention with review of jni_credential_loader.rs)

@comphead
Copy link
Copy Markdown
Contributor

thanks @karuppayya thats actually a good moment for this PR.

@snmvaughan FYI

@karuppayya karuppayya changed the title Credential provioder support feat: Credential provioder support May 15, 2026
@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks for putting this together @karuppayya! We're definitely thinking about the same problem right now. :)

One thing I'd like to push on before we go further. A fair amount of this PR is distribution and lifecycle wiring (the per-catalog broadcast cache, threading the broadcast through CometExecRDD and the operator, the IcebergReflection.getCatalogName lookup that keys the cache, and the initialize(catalogProperties) step that exists so the broadcast carries config). That feels like vendor territory to me rather than Comet territory. A vendor credential manager can already integrate with Spark's own credential distribution mechanisms, run its own broadcast, or stash state in a lazy singleton, none of which Comet has to know about. If the SPI is ServiceLoader-discovered, every executor finds the provider on its own classpath and the broadcast layer goes away.

My concern is scope. Comet's value is accelerating compute-intensive operations in native code, and I'd rather we not take on responsibility for reimplementing Spark's credential plumbing inside Comet when vendors are already equipped to handle it. #4309 takes the narrower position of owning only the JNI shape and the per-call context, and leaves caching, refresh, and distribution to the vendor implementation.

Is there a scenario you have in mind that #4309's shape cannot cover?

@karuppayya
Copy link
Copy Markdown
Contributor Author

@mbutrovich Appreciate the feedback — agree this is worth pinning down before going further.

The main scenario is REST-catalog deployments where vended credentials and refresh config originate in the driver's catalog context.

1. Iceberg already does exactly init+broadcast for S3FileIO. SparkBatch.planInputPartitions broadcasts the configured FileIO and
S3FileIO.initialize(Map<String,String>) runs on the driver. This change follows the same pattern.

2. I think REST makes this broadcast load-bearing( and not optional i think). LoadTableResponse.credentials are obtained driver-side and reach executors because RESTSessionCatalog.newFileIO calls
setCredentials(...)
and the FileIO is then broadcast. Refresh on the executor uses VendedCredentialsProvider built from broadcast
properties
.

3. Multi-catalog. This change keys broadcasts by catalog name, so per-catalog isolation works. #4309's SPI has no catalog identifier, so two catalogs sharing a vendor class silently collide on one cached instance — fixable though.

4. #4309 requires every vendor to ship Comet-Spark plumbing. A REST-catalog vendor would need to redo much of what this change does i guess

Curious what you think.

@comphead comphead changed the title feat: Credential provioder support feat: Credential provider support May 15, 2026
@mbutrovich
Copy link
Copy Markdown
Contributor

Thanks for the careful citations @karuppayya, those helped a lot.

I'm trying to figure out whether the broadcast and refresh machinery in this PR is something Comet should own, or whether a vendor implementation under #4309's SPI could give you the same end result with less Comet-side surface area. I'd love your read on that.

One scope thing I want to flag up front: we also need this for raw Parquet on S3, and any other native S3 reader Comet grows. Iceberg-vended creds are one important case, but vendors with custom signers or per-path STS for plain Parquet hit the same wall (Hadoop S3A signers don't surface the secret, AWSCredentialsProvider.getCredentials() is parameterless). The SPI shape in this PR is fairly Iceberg-shaped (named CometCredentialProvider under org.apache.comet.iceberg, initialize(catalogProperties) keyed off the catalog property bag, ResolveContext(tableLocation)), and it's not obvious how the Parquet path fits since there's no catalog or table location. #4309 covers both paths through one SPI under org.apache.comet.cloud.s3 with a per-call (bucket, path, mode) context that works for either, and I'd want whatever we land to keep that breadth.

On the REST-vended credentials point, the reason #4309 today can't reach credentials.uri and friends is that CometScanRule.filterStorageProperties strips them down to s3./gcs./adls./client. prefixes before they cross JNI. If we relax that filter so the full FileIO property map crosses as catalog_properties, and we only narrow to storage-prefix keys at the iceberg-rust FileIOBuilder.with_prop call site, then a vendor under #4309 sees everything LoadTableResponse returned. That seems to obsolete the parallel allFileIOProperties field this PR adds, with a much smaller change.

On multi-catalog with shared FQCN, you're right that #4309 has the gap today. I think it's fixable inside the existing SPI shape by extending CometS3CredentialDispatcher to cache by (FQCN, catalog id) and adding initialize(Map<String,String> catalogProperties) to CometS3CredentialProvider, called once per catalog on first instantiation. That mirrors S3FileIO.initialize, which is the shape vendors already write against for Iceberg.

What I'm less sure should live in Comet is refresh and distribution. The Rust-side TTL cache, the per-plan single-catalog invariant that throws on multi-catalog plans, and the driver-side broadcast cache feel like things Iceberg's own CachedSupplier / VendedCredentialsProvider and Spark's broadcast already cover, just inside the vendor's implementation rather than in Comet. Concretely, an Iceberg REST vended-credential CometS3CredentialProvider would look roughly like this:

public class IcebergRESTVendedS3Provider implements CometS3CredentialProvider {
  private volatile VendedCredentialsProvider provider;

  @Override
  public void initialize(Map<String, String> catalogProperties) {
    this.provider = VendedCredentialsProvider.create(catalogProperties);
  }

  @Override
  public CometS3Credentials getCredentialsForPath(
      String bucket, String path, CometS3AccessMode mode) {
    AwsSessionCredentials c = (AwsSessionCredentials) provider.resolveCredentials();
    long expiresAt = c.expirationTime().map(Instant::toEpochMilli).orElse(-1L);
    return new CometS3Credentials(
        c.accessKeyId(), c.secretAccessKey(), c.sessionToken(), expiresAt);
  }
}

Iceberg's CachedSupplier inside VendedCredentialsProvider handles refresh and prefetch, so neither the vendor nor Comet has to reinvent it. We could ship something like this as a reference implementation in Comet alongside the SPI so REST users aren't writing it from scratch, while keeping caching/refresh/distribution policy out of Comet itself.

Does this shape cover what you need, or is there a scenario I'm missing where the vendor model in #4309 (extended with the unfiltered properties + initialize(Map)) can't reach? Want to make sure I'm not arguing against a real requirement.

mbutrovich added a commit to mbutrovich/datafusion-comet that referenced this pull request May 18, 2026
@mbutrovich
Copy link
Copy Markdown
Contributor

Wanted to flag that the conversation we had on this PR shaped a few changes I just added in #4309. None of these are Comet adopting #4335 wholesale, but each one came from a real concern you raised, and I want the trail to be visible.

REST vended creds reach the SPI. CometScanRule used to run filterStorageProperties aggressively before stamping catalogProperties into the native scan op, so credentials.uri, OAuth tokens, and any vendor-custom keys never crossed JNI. I dropped the pre-filter and moved the storage-prefix narrowing to the native iceberg_scan.rs::load_file_io chokepoint where iceberg-rust's FileIOBuilder.with_prop actually requires it. The vendor SPI now sees the unfiltered FileIO bag through initialize(Map). Closes the same gap your allFileIOProperties field opened, without a parallel field on CometIcebergNativeScanMetadata.

Multi-catalog isolation under shared FQCN. Added default initialize(Map<String,String> catalogProperties) to CometS3CredentialProvider and changed CometS3CredentialDispatcher to key its instance cache by (FQCN, dispatchKey). On the Iceberg path dispatchKey is the Spark V2 catalog name (extracted via IcebergReflection.deriveCatalogName, which calls Table.name() and intersects against CatalogManager.listCatalogs(None) so a stray dot can't fake a catalog name); on the Parquet path it's the bucket. Two catalogs sharing one provider FQCN now get isolated provider instances with their own initialize maps. There is an end-to-end test in CometS3CredentialBridgeSuite that configures iso_a and iso_b with the same vendor class and asserts the dispatcher created two distinct instances.

Reference REST vended-creds provider. Added IcebergRESTVendedS3Provider under spark/src/test, ~30 lines wrapping Iceberg's VendedCredentialsProvider. Caching and refresh-near-expiry come from the AWS SDK's CachedSupplier inside VendedCredentialsProvider, so neither Comet nor the vendor reinvents either. Lives in test scope to keep iceberg-aws and AWS SDK v2 off Comet's runtime classpath; a follow-up can promote it to a runtime artifact behind an optional dep.

What I deliberately did not pull over. No Rust-side TTL cache, no driver-side broadcast cache for the credential provider, no plan walker that throws on multi-catalog plans. Caching policy stays with the vendor (CachedSupplier for Iceberg-REST vendors; whatever cache fits for custom STS), distribution stays with the vendor too (read from initialize's catalogProperties, or run a Spark broadcast inside the vendor's class), and expirationEpochMillis on the SPI return is metadata pass-through to object_store/opendal, which already have their own credential caches.

@parthchandra
Copy link
Copy Markdown
Contributor

Just wanted to add my 2 bits to the credentials refreshing bit.

The credentials providers are going to be executed on each executor and each executor will essentially request credentials at the same time. When running on a very large scale, this has been seen to sometimes overwhelm credentials backends leading to system-wide job failure. So caching the credentials at the executors makes sense, but it is generally better to refresh centrally and distribute the credentials.

It makes sense for the engine to do the refresh. For instance, in Spark, Kerberos delegation tokens are managed by Spark centrally in DelegationTokenManager

This does open the question of secure distribution of the credentials. Broadcast on an insecure channel will not do. The credentials distribution needs TLS.

@karuppayya
Copy link
Copy Markdown
Contributor Author

Thanks @mbutrovich for addressing my concerns. The Rust-side TTL is a JNI micro-optimization — if the switch between JVM and native isn't expensive, we can leave it out.

@parthchandra — fair point on the herd. I don't think we should go with centralized refresh though:

  • A driver-scheduled refresh becomes its own bottleneck, especially under Spark Connect where the Connect server is already a single point handling client RPCs, planning, and catalog interactions. Pushing per-executor credential refresh through it risks hitting networking limits before the credential backend does.
  • Iceberg REST's VendedCredentialsProvider already takes the distributed approach — executors refresh against the catalog endpoint directly, independent of the driver.
  • Vendor-side caching (both server and client) collapses(ideally) N same-scoped requests to ~1 upstream call, so total upstream request count stays close to centralized refresh in the common case.

Happy to close this PR in favor of #4309 and continue review on the other — we need this change soon. 🙂

@mbutrovich
Copy link
Copy Markdown
Contributor

Happy to close this PR in favor of #4309 and continue review on the other — we need this change soon. 🙂

Would love your feedback on the latest changes on #4309, especially if you can test with the example Iceberg implementation I included.

@karuppayya
Copy link
Copy Markdown
Contributor Author

@mbutrovich I will test this change today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Credential Provider Support

4 participants