AWS IAM OAUTHBEARER support via optional oauthbearer-aws module#2267
AWS IAM OAUTHBEARER support via optional oauthbearer-aws module#2267pranav shah (prashah-confluent) wants to merge 33 commits into
oauthbearer-aws module#2267Conversation
… subject extractor
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
oauthbearer-aws module
Emanuele Sabellico (emasab)
left a comment
There was a problem hiding this comment.
Thanks, a few comments:
There was a problem hiding this comment.
You can make the whole oauthbearer package internal, so these files become like:
src/confluent_kafka/_oauthbearer/aws/aws_iam_marker.py
given the user doesn't have to access these modules directly.
There was a problem hiding this comment.
Made oauthbearer package as internal.
| if (cause_value) { | ||
| Py_INCREF(cause_value); | ||
| PyException_SetCause(new_exc, cause_value); | ||
| } |
There was a problem hiding this comment.
Removing the cause will allow to hide the third party dependency from the message and avoid they're installed manually instead of with the optional dependency.
| if (cause_value) { | |
| Py_INCREF(cause_value); | |
| PyException_SetCause(new_exc, cause_value); | |
| } |
There was a problem hiding this comment.
Removed the cause to hide third party dependency from message.
| OAUTHBEARER authentication via AWS STS `GetWebIdentityToken`. Activate by setting `sasl.oauthbearer.method=oidc`, | ||
| `sasl.oauthbearer.metadata.authentication.type=aws_iam`, and | ||
| `sasl.oauthbearer.config="region=...,audience=..."`. See [`examples/oauth_oidc_ccloud_aws_iam.py`](examples/oauth_oidc_ccloud_aws_iam.py) | ||
| for a worked example. |
There was a problem hiding this comment.
Also add a link to the documentation Markdown file here. First create it in reStructuredText in docs folder, similar to kip-848-migration-guide.rst, add it to the rst2md Makefile target and then commit both and include the .rst file from docs/index.rst
There was a problem hiding this comment.
Created and added the documentation for oauthbearer-aws.md
|




What
Adds AWS IAM-based authentication to Kafka clients via the OAUTHBEARER SASL mechanism, delivered as a new optional install extra
confluent-kafka[oauthbearer-aws]. The extra mints short-lived JWTs through AWS STSGetWebIdentityToken(viaboto3) and hands them to librdkafka as the SASL bearer credential.Activation is config-only. Users install the extra and set three keys — no code changes at the integration site:
Users who don't install the extra see zero AWS dependencies (
boto3) in their dependency graph — theconfluent_kafka.oauthbearer.awsmodules ship in the wheel regardless, butimport boto3is the runtime gate, so nothing AWS loads unless the extra is present and the marker is set.Implementation highlights:
oauthbearer-aws(single wheel + PEP 621 extra — the Python idiom, vs .NET's separate NuGet package). Pulls inboto3(≥ the first release exposing STSget_web_identity_token).sasl.oauthbearer.metadata.authentication.type=aws_iam. Python config is string-keyed dicts.resolve_aws_oauthbearer_marker()incommon_conf_setup()(src/confluent_kafka/src/confluent_kafka.c) fires on everyProducer/Consumer/AdminClientconstruction (and transitivelyAIOProducer/AIOConsumer, which wrap the sync clients). On theaws_iammarker it lazy-imports the optionalconfluent_kafka.oauthbearer.aws.aws_autowire, callscreate_handler(...), and registers the returned callable as librdkafka's OIDCoauth_cb. Single chokepoint; no hard reference from core to boto3 (the import is lazy and gated on the marker).key=valueinsasl.oauthbearer.config; supportsregion,audience(required),duration_seconds,signing_algorithm(ES384/RS256),sts_endpoint,principal_name,aws_debug(Pythonicnone|consolesubset →boto3.set_stream_logger('botocore', DEBUG)), and repeatabletag_<name>JWT-claim entries (max 50). Full parity with the .NET grammar.sasl.oauthbearer.extensionsconfig key (comma-separatedkey=value), matching the convention used by the AzureIMDS path across the bindings.method=oidcenforced, friendly errors that name the missing key — and anImportErrornamingpip install 'confluent-kafka[oauthbearer-aws]'when the marker is set but the extra isn't installed (never a rawModuleNotFoundError: boto3)._kv_string_parserfor the whitespace/equals grammar, placed inconfluent_kafkacore (mirrors .NET'sKvStringParserplacement)._aws_oauthbearer_config.py(parser),_aws_sts_token_provider.py(STS call → token mapping),_aws_jwt_subject_extractor.py,_aws_sasl_extensions_parser.py,_aws_iam_marker.py(constants). Onlyaws_autowire.create_handleris public.examples/oauth_oidc_ccloud_aws_iam.py.Companion librdkafka changes are required for this PR to function end-to-end:
aws_iamvalue tosasl.oauthbearer.metadata.authentication.type's enumtoken.endpoint.urlcheck foraws_iam(parallel toazure_imds)The dispatcher passes the
aws_iammarker straight through to librdkafka (which recognizes it natively, bypasses the OIDCtoken.endpoint.url/grant-type checks, and uses our registeredoauth_cb), so an AWS-IAM-aware librdkafka is required. When the changes ship to a stablelibrdkafkarelease, bumpLIBRDKAFKA_VERSIONto it as a follow-up.Checklist
New optional extra, new public config keys (
method/ marker /config/extensions), new example. Not a breaking change — all additions are gated on theaws_iammarker (marker absent → the dispatcher is a no-op; opt-out installs pull zero AWS dependencies).209 tests under
tests/oauthbearer/aws/(9 files): config parser, STS token provider, JWT subject extractor, SASL-extensions parser, the C dispatcher (via Producer/Consumer/AdminClient construction), thecreate_handlerautowire entry point, a cross-language contract test, and a C↔Python marker-constant drift guard — of which 12 are real-STS integration tests (test_real_sts.py, skipped unless AWS credentials are present).References
JIRA: INIT-14269 (cross-language initiative; swap for the Python sub-task if one exists)
Companion librdkafka PR: confluentinc/librdkafka#5428
Test & Review
Automated:
Manual Testing:
Reviewer entry points:
examples/oauth_oidc_ccloud_aws_iam.py— the public-facing config surface and a worked example.src/confluent_kafka/oauthbearer/aws/aws_autowire.py— the staticcreate_handler()entry point + marker constants (_aws_iam_marker.py).src/confluent_kafka/src/confluent_kafka.c→resolve_aws_oauthbearer_marker()— the C-extension dispatcher incommon_conf_setup()(the core's marker detection + lazy autowire).src/confluent_kafka/oauthbearer/aws/_aws_sts_token_provider.py— the STS call and response → token mapping.Open questions / Follow-ups