fix(ingestion): refresh RDS IAM auth token per connection for MySQL#28730
fix(ingestion): refresh RDS IAM auth token per connection for MySQL#28730harshsoni2024 wants to merge 1 commit into
Conversation
RDS IAM auth tokens expire after ~15 minutes. The token was generated once in get_connection_url_common and baked into the SQLAlchemy URL, so a single engine reused one frozen token for every pooled connection and never refreshed it. With max_overflow=-1 and multi-threaded extraction, any connection opened after the 15-minute TTL — common on large catalogs and during profiling — authenticated with a stale token and failed mid run with "Access denied". Add RdsIamAuthTokenManager (caches the token, parses its expiry from the presigned-URL params, refreshes before it lapses) and wire a do_connect event listener in the MySQL connection handler that injects a fresh token on every new connection instead of embedding it in the URL. SSL is forced on for IAM (PyMySQL requires it) while preserving any existing SSL config. Scoped to the MySQL connector. The shared builders.py IAM path is still token-frozen for the other RDS connectors (Postgres, Redshift, Greenplum, Timescale); the reusable token manager lives in aws_client.py so they can adopt it next. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
❌ PR checklist incompleteThis PR cannot be merged until the following are addressed on its linked issue:
The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically. Maintainers can bypass this check by adding the |
| def get_token(self) -> str: | ||
| if self._needs_refresh(): | ||
| self._refresh_token() | ||
| if self._token is None: | ||
| raise RuntimeError("Failed to generate RDS IAM authentication token") | ||
| return self._token | ||
|
|
||
| def _needs_refresh(self) -> bool: | ||
| needs_refresh = True | ||
| if self._token is not None and self._expires_at is not None: | ||
| time_left = self._expires_at - datetime.datetime.now(datetime.timezone.utc) | ||
| needs_refresh = time_left <= self.refresh_threshold | ||
| return needs_refresh | ||
|
|
||
| def _refresh_token(self) -> None: |
There was a problem hiding this comment.
⚠️ Bug: RdsIamAuthTokenManager token cache is not thread-safe
RdsIamAuthTokenManager.get_token() is invoked from the SQLAlchemy do_connect listener registered in _get_iam_engine. The PR's own description notes the engine is built with max_overflow=-1 and ingestion is multi-threaded, so do_connect (and thus get_token) fires concurrently from many threads. get_token -> _needs_refresh -> _refresh_token mutate the shared self._token / self._expires_at with no lock.
Consequences:
- When the token nears expiry, multiple threads can simultaneously evaluate
_needs_refresh()as True and each call_refresh_token(), issuing redundantgenerate_db_auth_tokencalls (each spins up a freshAWSClient/boto3 client). _refresh_tokenupdates_tokenand_expires_atnon-atomically; a concurrent reader can observe a new_tokenpaired with a stale_expires_at(or vice-versa), undermining the refresh accounting this manager exists to guarantee.
Guard the read-check-refresh sequence with a threading.Lock so refresh happens once and state is updated atomically.
Serialize the check-and-refresh under a lock so only one thread regenerates the token and state updates are atomic.:
import threading
def __init__(self, ...):
...
self._token: Optional[str] = None
self._expires_at: Optional[datetime.datetime] = None
self._lock = threading.Lock()
def get_token(self) -> str:
with self._lock:
if self._needs_refresh():
self._refresh_token()
if self._token is None:
raise RuntimeError("Failed to generate RDS IAM authentication token")
return self._token
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
| if auth_type.awsConfig is None: | ||
| raise ValueError("awsConfig is required for MySQL RDS IAM authentication") | ||
|
|
||
| host, port = connection.hostPort.split(":") |
There was a problem hiding this comment.
💡 Edge Case: hostPort split and unencoded username are fragile in _get_iam_engine
In _get_iam_engine, host, port = connection.hostPort.split(":") will raise ValueError if hostPort lacks a port or contains more than one colon (e.g. an IPv6 literal or an accidental trailing colon), producing an opaque unpacking error instead of a clear configuration message. Additionally, base_url = f"{scheme}://{connection.username}@{connection.hostPort}" interpolates the username without URL-encoding; usernames containing reserved characters (@, /, :) would corrupt the URL. Consider rsplit(":", 1) with a validation/error message and urllib.parse.quote for the username.
Validate host/port parsing and URL-encode the username.:
host, _, port = connection.hostPort.rpartition(":")
if not host or not port:
raise ValueError(f"Invalid hostPort for MySQL RDS IAM: {connection.hostPort!r}")
...
from urllib.parse import quote
base_url = f"{scheme}://{quote(connection.username)}@{connection.hostPort}"
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
There was a problem hiding this comment.
Pull request overview
This PR fixes time-dependent failures in MySQL ingestion against AWS RDS using IAM authentication by ensuring an RDS IAM auth token is refreshed for each new SQLAlchemy connection (instead of being generated once and embedded into the engine URL).
Changes:
- Added
RdsIamAuthTokenManagerto cache and refresh RDS IAM tokens based on presigned-token expiry. - Updated MySQL connection handling to build an IAM-specific engine and inject a fresh token via a SQLAlchemy
do_connectlistener (and enable SSL for PyMySQL). - Added unit tests covering MySQL IAM engine wiring and token-manager refresh behavior, plus characterization tests documenting the remaining shared
builders.pygap.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| ingestion/src/metadata/ingestion/source/database/mysql/connection.py | Adds IAM-specific engine path with do_connect listener to inject fresh IAM token per connection. |
| ingestion/src/metadata/clients/aws_client.py | Introduces RdsIamAuthTokenManager that parses token expiry and refreshes before expiry. |
| ingestion/tests/unit/source/database/test_mysql_iam.py | New unit tests validating MySQL IAM engine URL behavior, listener registration, token injection, and SSL handling. |
| ingestion/tests/unit/connections/test_iam_token_refresh.py | Characterization tests documenting current shared IAM URL behavior (token baked into URL) as a follow-up gap. |
| scheme = connection.scheme.value if connection.scheme else "mysql+pymysql" | ||
| base_url = f"{scheme}://{connection.username}@{connection.hostPort}" | ||
| engine = create_generic_db_connection( | ||
| connection=connection, | ||
| get_connection_url_fn=lambda _: base_url, | ||
| get_connection_args_fn=get_connection_args_common, | ||
| ) |
| def inject_iam_token(_dialect, _conn_rec, _cargs, cparams: Dict[str, Any]): # noqa: UP006 | ||
| cparams["password"] = token_manager.get_token() | ||
| cparams["ssl"] = cparams.get("ssl") or {"ssl": True} | ||
|
|
| cparams = {} | ||
| listener(None, None, None, cparams) | ||
| assert cparams["ssl"] | ||
|
|
| These tests pin the behaviour of the SHARED helper, which is still token-frozen | ||
| and is the remaining gap for the other RDS connectors that go through it | ||
| (Postgres, Redshift, Greenplum, Timescale). They are expected to FAIL — and | ||
| should be flipped to assert refreshed behaviour — once the shared path is fixed | ||
| too. ``test_iam_token_is_used_for_authentication`` is the invariant that holds |
🟡 Playwright Results — all passed (15 flaky)✅ 4258 passed · ❌ 0 failed · 🟡 15 flaky · ⏭️ 88 skipped
🟡 15 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
Problem
Fix: #28686
MySQL ingestion against AWS RDS using IAM authentication fails partway
through long runs with
Access denied.The IAM auth token is generated once, at engine-build time, and baked into
the SQLAlchemy connection URL (
get_password_secret→get_connection_url_commonin
builders.py). RDS IAM tokens expire after ~15 minutes, and the token isnever refreshed for the lifetime of the engine. Because the engine is created with
max_overflow=-1(unlimited overflow connections) and ingestion is multi-threaded,any new pooled connection opened after the token expires — large schemas, the
profiler, reconnects — authenticates with a stale credential and dies mid-run.
The failure is time-dependent and intermittent: a small/single-connection run can
reuse one still-valid connection and pass, which masks the bug; a large catalog
reliably hits it.
Fix
clients/aws_client.py— addRdsIamAuthTokenManager: generates the RDS IAMtoken, derives expiry from the presigned-URL params (
X-Amz-Date/X-Amz-Expires),caches it, and regenerates shortly before expiry. Falls back to a conservative TTL
if the token can't be parsed so it still refreshes rather than living forever.
source/database/mysql/connection.py— routeIamAuthConfigurationSourceto anew
_get_iam_enginethat:scheme://user@host:port), anddo_connectevent listener that injects a fresh token onevery connection and enables SSL (PyMySQL requires SSL for RDS IAM), preserving
any existing SSL config.
This keeps the connector-specific wiring in the MySQL connection handler while the
reusable token manager lives with
AWSClient.Scope / follow-up
Scoped to MySQL. The shared
builders.pyIAM path remains token-frozen for theother RDS connectors that go through it (Postgres, Redshift, Greenplum, Timescale).
tests/unit/connections/test_iam_token_refresh.pydocuments that remaining gap and isset up to flip to green once the shared path adopts
RdsIamAuthTokenManager.Tests
tests/unit/source/database/test_mysql_iam.py(new):do_connectlistener registered,fresh token minted per connection, SSL enabled, existing SSL preserved,
host/port split correctly.
expiry, expiry parsed from the presigned URL, malformed-token fallback.
tests/unit/connections/test_iam_token_refresh.py(new): characterizes the sharedbuilders.pyIAM path as the remaining gap for the non-MySQL RDS connectors.