Skip to content

fix(ingestion): refresh RDS IAM auth token per connection for MySQL#28730

Open
harshsoni2024 wants to merge 1 commit into
mainfrom
refresh_iam_auth_token
Open

fix(ingestion): refresh RDS IAM auth token per connection for MySQL#28730
harshsoni2024 wants to merge 1 commit into
mainfrom
refresh_iam_auth_token

Conversation

@harshsoni2024
Copy link
Copy Markdown
Contributor

@harshsoni2024 harshsoni2024 commented Jun 5, 2026

Problem

Fix: #28686

  • MySQL ingestion against AWS RDS using IAM authentication fails partway
    through long runs with Access denied.

  • The IAM auth token is generated once, at engine-build time, and baked into
    the SQLAlchemy connection URL (get_password_secretget_connection_url_common
    in builders.py). RDS IAM tokens expire after ~15 minutes, and the token is
    never refreshed for the lifetime of the engine. Because the engine is created with
    max_overflow=-1 (unlimited overflow connections) and ingestion is multi-threaded,
    any new pooled connection opened after the token expires — large schemas, the
    profiler, reconnects — authenticates with a stale credential and dies mid-run.

  • The failure is time-dependent and intermittent: a small/single-connection run can
    reuse one still-valid connection and pass, which masks the bug; a large catalog
    reliably hits it.

Fix

  • clients/aws_client.py — add RdsIamAuthTokenManager: generates the RDS IAM
    token, derives expiry from the presigned-URL params (X-Amz-Date / X-Amz-Expires),
    caches it, and regenerates shortly before expiry. Falls back to a conservative TTL
    if the token can't be parsed so it still refreshes rather than living forever.
  • source/database/mysql/connection.py — route IamAuthConfigurationSource to a
    new _get_iam_engine that:
    • builds the engine with no token in the URL (scheme://user@host:port), and
    • attaches a SQLAlchemy do_connect event listener that injects a fresh token on
      every connection
      and enables SSL (PyMySQL requires SSL for RDS IAM), preserving
      any existing SSL config.

This keeps the connector-specific wiring in the MySQL connection handler while the
reusable token manager lives with AWSClient.

Scope / follow-up

Scoped to MySQL. The shared builders.py IAM path remains token-frozen for the
other RDS connectors that go through it (Postgres, Redshift, Greenplum, Timescale).
tests/unit/connections/test_iam_token_refresh.py documents that remaining gap and is
set up to flip to green once the shared path adopts RdsIamAuthTokenManager.

Tests

  • tests/unit/source/database/test_mysql_iam.py (new):
    • Connector: token not baked into the URL, do_connect listener registered,
      fresh token minted per connection, SSL enabled, existing SSL preserved,
      host/port split correctly.
    • Token manager: first call generates, cached within TTL, refreshed after
      expiry
      , expiry parsed from the presigned URL, malformed-token fallback.
  • tests/unit/connections/test_iam_token_refresh.py (new): characterizes the shared
    builders.py IAM path as the remaining gap for the non-MySQL RDS connectors.

RDS IAM auth tokens expire after ~15 minutes. The token was generated
once in get_connection_url_common and baked into the SQLAlchemy URL, so
a single engine reused one frozen token for every pooled connection and
never refreshed it. With max_overflow=-1 and multi-threaded extraction,
any connection opened after the 15-minute TTL — common on large catalogs
and during profiling — authenticated with a stale token and failed mid
run with "Access denied".

Add RdsIamAuthTokenManager (caches the token, parses its expiry from the
presigned-URL params, refreshes before it lapses) and wire a do_connect
event listener in the MySQL connection handler that injects a fresh token
on every new connection instead of embedding it in the URL. SSL is forced
on for IAM (PyMySQL requires it) while preserving any existing SSL config.

Scoped to the MySQL connector. The shared builders.py IAM path is still
token-frozen for the other RDS connectors (Postgres, Redshift, Greenplum,
Timescale); the reusable token manager lives in aws_client.py so they can
adopt it next.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings June 5, 2026 05:49
@harshsoni2024 harshsoni2024 requested a review from a team as a code owner June 5, 2026 05:49
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

❌ PR checklist incomplete

This PR cannot be merged until the following are addressed on its linked issue:

The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically.

Maintainers can bypass this check by adding the skip-pr-checks label.

@github-actions github-actions Bot added Ingestion safe to test Add this label to run secure Github workflows on PRs labels Jun 5, 2026
Comment on lines +300 to +314
def get_token(self) -> str:
if self._needs_refresh():
self._refresh_token()
if self._token is None:
raise RuntimeError("Failed to generate RDS IAM authentication token")
return self._token

def _needs_refresh(self) -> bool:
needs_refresh = True
if self._token is not None and self._expires_at is not None:
time_left = self._expires_at - datetime.datetime.now(datetime.timezone.utc)
needs_refresh = time_left <= self.refresh_threshold
return needs_refresh

def _refresh_token(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: RdsIamAuthTokenManager token cache is not thread-safe

RdsIamAuthTokenManager.get_token() is invoked from the SQLAlchemy do_connect listener registered in _get_iam_engine. The PR's own description notes the engine is built with max_overflow=-1 and ingestion is multi-threaded, so do_connect (and thus get_token) fires concurrently from many threads. get_token -> _needs_refresh -> _refresh_token mutate the shared self._token / self._expires_at with no lock.

Consequences:

  • When the token nears expiry, multiple threads can simultaneously evaluate _needs_refresh() as True and each call _refresh_token(), issuing redundant generate_db_auth_token calls (each spins up a fresh AWSClient/boto3 client).
  • _refresh_token updates _token and _expires_at non-atomically; a concurrent reader can observe a new _token paired with a stale _expires_at (or vice-versa), undermining the refresh accounting this manager exists to guarantee.

Guard the read-check-refresh sequence with a threading.Lock so refresh happens once and state is updated atomically.

Serialize the check-and-refresh under a lock so only one thread regenerates the token and state updates are atomic.:

import threading

    def __init__(self, ...):
        ...
        self._token: Optional[str] = None
        self._expires_at: Optional[datetime.datetime] = None
        self._lock = threading.Lock()

    def get_token(self) -> str:
        with self._lock:
            if self._needs_refresh():
                self._refresh_token()
            if self._token is None:
                raise RuntimeError("Failed to generate RDS IAM authentication token")
            return self._token
  • Apply fix

Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎

if auth_type.awsConfig is None:
raise ValueError("awsConfig is required for MySQL RDS IAM authentication")

host, port = connection.hostPort.split(":")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: hostPort split and unencoded username are fragile in _get_iam_engine

In _get_iam_engine, host, port = connection.hostPort.split(":") will raise ValueError if hostPort lacks a port or contains more than one colon (e.g. an IPv6 literal or an accidental trailing colon), producing an opaque unpacking error instead of a clear configuration message. Additionally, base_url = f"{scheme}://{connection.username}@{connection.hostPort}" interpolates the username without URL-encoding; usernames containing reserved characters (@, /, :) would corrupt the URL. Consider rsplit(":", 1) with a validation/error message and urllib.parse.quote for the username.

Validate host/port parsing and URL-encode the username.:

host, _, port = connection.hostPort.rpartition(":")
if not host or not port:
    raise ValueError(f"Invalid hostPort for MySQL RDS IAM: {connection.hostPort!r}")
...
from urllib.parse import quote
base_url = f"{scheme}://{quote(connection.username)}@{connection.hostPort}"
  • Apply fix

Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Jun 5, 2026

Code Review ⚠️ Changes requested 0 resolved / 2 findings

Implements a per-connection RDS IAM token refresh mechanism for MySQL to prevent auth failures during long-running ingestion. The PR is currently blocked by a thread-safety issue in the RdsIamAuthTokenManager and fragile connection string parsing logic.

⚠️ Bug: RdsIamAuthTokenManager token cache is not thread-safe

📄 ingestion/src/metadata/clients/aws_client.py:300-314 📄 ingestion/src/metadata/ingestion/source/database/mysql/connection.py:116-120

RdsIamAuthTokenManager.get_token() is invoked from the SQLAlchemy do_connect listener registered in _get_iam_engine. The PR's own description notes the engine is built with max_overflow=-1 and ingestion is multi-threaded, so do_connect (and thus get_token) fires concurrently from many threads. get_token -> _needs_refresh -> _refresh_token mutate the shared self._token / self._expires_at with no lock.

Consequences:

  • When the token nears expiry, multiple threads can simultaneously evaluate _needs_refresh() as True and each call _refresh_token(), issuing redundant generate_db_auth_token calls (each spins up a fresh AWSClient/boto3 client).
  • _refresh_token updates _token and _expires_at non-atomically; a concurrent reader can observe a new _token paired with a stale _expires_at (or vice-versa), undermining the refresh accounting this manager exists to guarantee.

Guard the read-check-refresh sequence with a threading.Lock so refresh happens once and state is updated atomically.

Serialize the check-and-refresh under a lock so only one thread regenerates the token and state updates are atomic.
import threading

    def __init__(self, ...):
        ...
        self._token: Optional[str] = None
        self._expires_at: Optional[datetime.datetime] = None
        self._lock = threading.Lock()

    def get_token(self) -> str:
        with self._lock:
            if self._needs_refresh():
                self._refresh_token()
            if self._token is None:
                raise RuntimeError("Failed to generate RDS IAM authentication token")
            return self._token
💡 Edge Case: hostPort split and unencoded username are fragile in _get_iam_engine

📄 ingestion/src/metadata/ingestion/source/database/mysql/connection.py:101 📄 ingestion/src/metadata/ingestion/source/database/mysql/connection.py:109

In _get_iam_engine, host, port = connection.hostPort.split(":") will raise ValueError if hostPort lacks a port or contains more than one colon (e.g. an IPv6 literal or an accidental trailing colon), producing an opaque unpacking error instead of a clear configuration message. Additionally, base_url = f"{scheme}://{connection.username}@{connection.hostPort}" interpolates the username without URL-encoding; usernames containing reserved characters (@, /, :) would corrupt the URL. Consider rsplit(":", 1) with a validation/error message and urllib.parse.quote for the username.

Validate host/port parsing and URL-encode the username.
host, _, port = connection.hostPort.rpartition(":")
if not host or not port:
    raise ValueError(f"Invalid hostPort for MySQL RDS IAM: {connection.hostPort!r}")
...
from urllib.parse import quote
base_url = f"{scheme}://{quote(connection.username)}@{connection.hostPort}"
🤖 Prompt for agents
Code Review: Implements a per-connection RDS IAM token refresh mechanism for MySQL to prevent auth failures during long-running ingestion. The PR is currently blocked by a thread-safety issue in the `RdsIamAuthTokenManager` and fragile connection string parsing logic.

1. ⚠️ Bug: RdsIamAuthTokenManager token cache is not thread-safe
   Files: ingestion/src/metadata/clients/aws_client.py:300-314, ingestion/src/metadata/ingestion/source/database/mysql/connection.py:116-120

   `RdsIamAuthTokenManager.get_token()` is invoked from the SQLAlchemy `do_connect` listener registered in `_get_iam_engine`. The PR's own description notes the engine is built with `max_overflow=-1` and ingestion is multi-threaded, so `do_connect` (and thus `get_token`) fires concurrently from many threads. `get_token` -> `_needs_refresh` -> `_refresh_token` mutate the shared `self._token` / `self._expires_at` with no lock.
   
   Consequences:
   - When the token nears expiry, multiple threads can simultaneously evaluate `_needs_refresh()` as True and each call `_refresh_token()`, issuing redundant `generate_db_auth_token` calls (each spins up a fresh `AWSClient`/boto3 client).
   - `_refresh_token` updates `_token` and `_expires_at` non-atomically; a concurrent reader can observe a new `_token` paired with a stale `_expires_at` (or vice-versa), undermining the refresh accounting this manager exists to guarantee.
   
   Guard the read-check-refresh sequence with a `threading.Lock` so refresh happens once and state is updated atomically.

   Fix (Serialize the check-and-refresh under a lock so only one thread regenerates the token and state updates are atomic.):
   import threading
   
       def __init__(self, ...):
           ...
           self._token: Optional[str] = None
           self._expires_at: Optional[datetime.datetime] = None
           self._lock = threading.Lock()
   
       def get_token(self) -> str:
           with self._lock:
               if self._needs_refresh():
                   self._refresh_token()
               if self._token is None:
                   raise RuntimeError("Failed to generate RDS IAM authentication token")
               return self._token

2. 💡 Edge Case: hostPort split and unencoded username are fragile in _get_iam_engine
   Files: ingestion/src/metadata/ingestion/source/database/mysql/connection.py:101, ingestion/src/metadata/ingestion/source/database/mysql/connection.py:109

   In `_get_iam_engine`, `host, port = connection.hostPort.split(":")` will raise `ValueError` if `hostPort` lacks a port or contains more than one colon (e.g. an IPv6 literal or an accidental trailing colon), producing an opaque unpacking error instead of a clear configuration message. Additionally, `base_url = f"{scheme}://{connection.username}@{connection.hostPort}"` interpolates the username without URL-encoding; usernames containing reserved characters (`@`, `/`, `:`) would corrupt the URL. Consider `rsplit(":", 1)` with a validation/error message and `urllib.parse.quote` for the username.

   Fix (Validate host/port parsing and URL-encode the username.):
   host, _, port = connection.hostPort.rpartition(":")
   if not host or not port:
       raise ValueError(f"Invalid hostPort for MySQL RDS IAM: {connection.hostPort!r}")
   ...
   from urllib.parse import quote
   base_url = f"{scheme}://{quote(connection.username)}@{connection.hostPort}"

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes time-dependent failures in MySQL ingestion against AWS RDS using IAM authentication by ensuring an RDS IAM auth token is refreshed for each new SQLAlchemy connection (instead of being generated once and embedded into the engine URL).

Changes:

  • Added RdsIamAuthTokenManager to cache and refresh RDS IAM tokens based on presigned-token expiry.
  • Updated MySQL connection handling to build an IAM-specific engine and inject a fresh token via a SQLAlchemy do_connect listener (and enable SSL for PyMySQL).
  • Added unit tests covering MySQL IAM engine wiring and token-manager refresh behavior, plus characterization tests documenting the remaining shared builders.py gap.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
ingestion/src/metadata/ingestion/source/database/mysql/connection.py Adds IAM-specific engine path with do_connect listener to inject fresh IAM token per connection.
ingestion/src/metadata/clients/aws_client.py Introduces RdsIamAuthTokenManager that parses token expiry and refreshes before expiry.
ingestion/tests/unit/source/database/test_mysql_iam.py New unit tests validating MySQL IAM engine URL behavior, listener registration, token injection, and SSL handling.
ingestion/tests/unit/connections/test_iam_token_refresh.py Characterization tests documenting current shared IAM URL behavior (token baked into URL) as a follow-up gap.

Comment on lines +108 to +114
scheme = connection.scheme.value if connection.scheme else "mysql+pymysql"
base_url = f"{scheme}://{connection.username}@{connection.hostPort}"
engine = create_generic_db_connection(
connection=connection,
get_connection_url_fn=lambda _: base_url,
get_connection_args_fn=get_connection_args_common,
)
Comment on lines +116 to +119
def inject_iam_token(_dialect, _conn_rec, _cargs, cparams: Dict[str, Any]): # noqa: UP006
cparams["password"] = token_manager.get_token()
cparams["ssl"] = cparams.get("ssl") or {"ssl": True}

Comment on lines +121 to +124
cparams = {}
listener(None, None, None, cparams)
assert cparams["ssl"]

Comment on lines +23 to +27
These tests pin the behaviour of the SHARED helper, which is still token-frozen
and is the remaining gap for the other RDS connectors that go through it
(Postgres, Redshift, Greenplum, Timescale). They are expected to FAIL — and
should be flipped to assert refreshed behaviour — once the shared path is fixed
too. ``test_iam_token_is_used_for_authentication`` is the invariant that holds
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

🟡 Playwright Results — all passed (15 flaky)

✅ 4258 passed · ❌ 0 failed · 🟡 15 flaky · ⏭️ 88 skipped

Shard Passed Failed Flaky Skipped
🟡 Shard 1 297 0 2 4
🟡 Shard 2 802 0 1 9
✅ Shard 3 804 0 0 8
🟡 Shard 4 852 0 2 12
🟡 Shard 5 718 0 2 47
🟡 Shard 6 785 0 8 8
🟡 15 flaky test(s) (passed on retry)
  • Pages/AuditLogs.spec.ts › should apply both User and EntityType filters simultaneously (shard 1, 1 retry)
  • Pages/UserCreationWithPersona.spec.ts › Create user with persona and verify on profile (shard 1, 1 retry)
  • Features/Glossary/GlossaryWorkflow.spec.ts › should start term as Draft when glossary has reviewers (shard 2, 2 retries)
  • Flow/ObservabilityAlerts.spec.ts › Test Suite alert (shard 4, 1 retry)
  • Pages/CustomProperties.spec.ts › Date Time (shard 4, 1 retry)
  • Pages/Entity.spec.ts › User as Owner with unsorted list (shard 5, 1 retry)
  • Pages/ExplorePageRightPanel_KnowledgeCenter.spec.ts › Should remove user owner for knowledgeCenter (shard 5, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › verify create lineage for entity - Data Model (shard 6, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › Column lineage for topic -> dashboardDataModel (shard 6, 1 retry)
  • Pages/Lineage/DataAssetLineage.spec.ts › Column lineage for searchIndex -> dashboard (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage service type filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageFilters.spec.ts › Verify lineage schema filter selection (shard 6, 1 retry)
  • Pages/Lineage/LineageRightPanel.spec.ts › Verify custom properties tab IS visible for supported type: searchIndex (shard 6, 1 retry)
  • Pages/Lineage/PlatformLineage.spec.ts › Verify domain platform view (shard 6, 1 retry)
  • Pages/Login.spec.ts › Refresh should work (shard 6, 1 retry)

📦 Download artifacts

How to debug locally
# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip    # view trace

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Ingestion safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug(ingestion): RDS IAM auth token generated once and never refreshed. Long MySQL ingestions fail mid-run with "Access denied"

2 participants