From aaad5b1f46562e18795027710ad6be81e5f20271 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Thu, 4 Jun 2026 21:07:41 +0530 Subject: [PATCH 1/3] PoC: enforce upstream IdP session_expiry ceiling for enterprise connections --- README.md | 6 + examples/RetrievingData.md | 33 +++ .../auth_server/server_client.py | 48 +++- .../auth_types/__init__.py | 6 + src/auth0_server_python/error/__init__.py | 1 + .../tests/test_server_client.py | 207 +++++++++++++++++- src/auth0_server_python/utils/helpers.py | 52 ++++- 7 files changed, 350 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ff5e46c..0c63cfe 100644 --- a/README.md +++ b/README.md @@ -173,6 +173,12 @@ The SDK handles per-domain OIDC discovery, JWKS fetching, issuer validation, and For more details and examples, see [examples/MultipleCustomDomains.md](examples/MultipleCustomDomains.md). +### 6. Session Expiry from the Upstream IdP + +For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. + +For more details and examples, see [examples/RetrievingData.md](examples/RetrievingData.md#session-expiry-from-the-upstream-idp). + ## Feedback ### Contributing diff --git a/examples/RetrievingData.md b/examples/RetrievingData.md index 88fb610..221c1a8 100644 --- a/examples/RetrievingData.md +++ b/examples/RetrievingData.md @@ -70,6 +70,39 @@ access_token = await server_client.get_access_token(store_options=store_options) Read more above in [Configuring the Store](./ConfigureStore.md). +## Session Expiry from the Upstream IdP + +For enterprise connections, the upstream identity provider can impose a ceiling on how long the user's session may live. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim (an absolute Unix timestamp, in seconds) in the ID token. The SDK reads this value at login, stores it with the session, and enforces it on every subsequent read. + +Once the ceiling is reached, the read methods behave as follows: + +- `get_user()` returns `None`, as if no session exists. +- `get_session()` returns `None`, as if no session exists. +- `get_access_token()` raises an `AccessTokenError` with code `session_expired`. + +```python +from auth0_server_python.error import AccessTokenError, AccessTokenErrorCode + +try: + access_token = await server_client.get_access_token(store_options=store_options) +except AccessTokenError as error: + if error.code == AccessTokenErrorCode.SESSION_EXPIRED: + # The upstream session ceiling has been reached; start a new login. + ... +``` + +When the ceiling is reached, the SDK deletes the stored session before returning, so the next request starts clean. + +The `session_expiry` value is also surfaced through the user claims, so you can read it without triggering enforcement: + +```python +user = await server_client.get_user(store_options=store_options) +session_expires_at = (user or {}).get("session_expiry") +``` + +> [!NOTE] +> Enforcement applies a small negative leeway (30 seconds) to account for clock skew, so a session is treated as expired slightly before the exact `session_expiry` timestamp. The refresh-token grant preserves the original ceiling - refreshing an access token does not extend the upstream session. + ## Multi-Resource Refresh Tokens (MRRT) Multi-Resource Refresh Tokens allow using a single refresh token to obtain access tokens for multiple audiences, simplifying token management in applications that interact with multiple backend services. diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 91de45d..0ba698b 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -638,9 +638,14 @@ async def complete_interactive_login( user_info = token_response.get("userinfo") user_claims = None id_token = token_response.get("id_token") + # IPSIE session_expiry ceiling, read from the verified ID token claims. + session_expires_at = None if user_info: user_claims = UserClaims.parse_obj(user_info) + # authlib populates `userinfo` from parsed ID token claims, so the + # IPSIE session_expiry claim may surface here. + session_expires_at = State.extract_session_expiry(user_info) elif id_token: # Fetch JWKS for signature verification jwks = await self._get_jwks_cached(origin_domain, metadata) @@ -657,6 +662,8 @@ async def complete_interactive_login( raise IssuerValidationError("ID token issuer mismatch. Ensure your Auth0 domain is configured correctly.") user_claims = UserClaims.parse_obj(claims) + # IPSIE session_expiry ceiling from the verified ID token. + session_expires_at = State.extract_session_expiry(claims) except ValueError as e: raise ApiError("jwks_key_not_found", str(e)) except jwt.InvalidSignatureError as e: @@ -708,7 +715,8 @@ async def complete_interactive_login( domain=origin_domain, internal={ "sid": sid, - "created_at": int(time.time()) + "created_at": int(time.time()), + "session_expires_at": session_expires_at } ) @@ -734,6 +742,23 @@ async def complete_interactive_login( # Methods for retrieving user information, session data, and logout operations. # ============================================================================ + async def _is_session_expired_by_ceiling( + self, state_data_dict: dict, store_options: Optional[dict[str, Any]] = None + ) -> bool: + """ + Enforce the IPSIE session_expiry ceiling on a session read. + + Returns True (and deletes the stored session) when the upstream + IdP-asserted ceiling has been reached. Sessions without a + session_expires_at value are never expired on this basis. + """ + internal = state_data_dict.get("internal") or {} + session_expires_at = internal.get("session_expires_at") + if State.is_session_expiry_reached(session_expires_at): + await self._state_store.delete(self._state_identifier, options=store_options) + return True + return False + async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Optional[dict[str, Any]]: """ Retrieves the user from the store, or None if no user found. @@ -760,6 +785,10 @@ async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Opti if self._normalize_url(session_domain) != self._normalize_url(current_domain): return None + # IPSIE: force re-auth once the upstream IdP session ceiling passes. + if await self._is_session_expired_by_ceiling(state_data, store_options): + return None + return state_data.get("user") return None @@ -789,6 +818,10 @@ async def get_session(self, store_options: Optional[dict[str, Any]] = None) -> O if self._normalize_url(session_domain) != self._normalize_url(current_domain): return None + # IPSIE: force re-auth once the upstream IdP session ceiling passes. + if await self._is_session_expired_by_ceiling(state_data, store_options): + return None + session_data = {k: v for k, v in state_data.items() if k != "internal"} return session_data @@ -972,6 +1005,19 @@ async def get_access_token( merged_scope = self._merge_scope_with_defaults(scope, audience) + # IPSIE: once the upstream IdP session ceiling has passed, the session + # is expired. Surface "session expired" and do NOT serve a cached token + # or attempt a refresh-token exchange (which would race the platform's + # session revocation). + internal = (state_data_dict or {}).get("internal") or {} + if State.is_session_expiry_reached(internal.get("session_expires_at")): + await self._state_store.delete(self._state_identifier, options=store_options) + raise AccessTokenError( + AccessTokenErrorCode.SESSION_EXPIRED, + "The session has expired because the upstream identity provider's " + "session_expiry was reached. The user needs to re-authenticate." + ) + # Find matching token set token_set = None if state_data_dict and "token_sets" in state_data_dict: diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 055103a..9fb6ca2 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -22,6 +22,8 @@ class UserClaims(BaseModel): email: Optional[str] = None email_verified: Optional[bool] = None org_id: Optional[str] = None + # IPSIE SL1 claim: upstream IdP-asserted RP session ceiling (Unix seconds). + session_expiry: Optional[int] = None class Config: extra = "allow" # Allow additional fields not defined in the model @@ -54,6 +56,10 @@ class InternalStateData(BaseModel): """ sid: str created_at: int + # IPSIE session_expiry ceiling (Unix seconds), stamped at session creation + # from the ID token's session_expiry claim. None when the upstream IdP did + # not assert one — in which case existing session behavior is unchanged. + session_expires_at: Optional[int] = None class SessionData(BaseModel): diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index db4f28e..9de7615 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -198,6 +198,7 @@ class AccessTokenErrorCode: INCORRECT_AUDIENCE = "incorrect_audience" MISSING_SESSION_DOMAIN = "missing_session_domain" DOMAIN_MISMATCH = "domain_mismatch" + SESSION_EXPIRED = "session_expired" class AccessTokenForConnectionErrorCode: diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 47ba774..19ad3ee 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -46,7 +46,7 @@ PollingApiError, StartLinkUserError, ) -from auth0_server_python.utils import PKCE +from auth0_server_python.utils import PKCE, State @pytest.mark.asyncio @@ -4816,3 +4816,208 @@ async def _fake_fetch(self, domain): assert exc.value.mfa_requirements is not None finally: ServerClient._fetch_oidc_metadata = original_fetch + + +# ============================================================================= +# IPSIE session_expiry enforcement +# ============================================================================= + + +def test_extract_session_expiry_valid(): + assert State.extract_session_expiry({"session_expiry": 1893456000}) == 1893456000 + + +def test_extract_session_expiry_absent_or_empty(): + assert State.extract_session_expiry(None) is None + assert State.extract_session_expiry({}) is None + assert State.extract_session_expiry({"session_expiry": None}) is None + + +def test_extract_session_expiry_rejects_non_int_and_non_positive(): + # bool is an int subclass but must be rejected + assert State.extract_session_expiry({"session_expiry": True}) is None + assert State.extract_session_expiry({"session_expiry": "1893456000"}) is None + assert State.extract_session_expiry({"session_expiry": 1893456000.0}) is None + assert State.extract_session_expiry({"session_expiry": 0}) is None + assert State.extract_session_expiry({"session_expiry": -5}) is None + + +def test_is_session_expiry_reached_none_never_expires(): + assert State.is_session_expiry_reached(None) is False + + +def test_is_session_expiry_reached_future_and_past(): + now = int(time.time()) + # Comfortably in the future (beyond the leeway window) -> not reached. + assert State.is_session_expiry_reached(now + 3600) is False + # In the past -> reached. + assert State.is_session_expiry_reached(now - 10) is True + + +def test_is_session_expiry_reached_applies_negative_leeway(): + now = int(time.time()) + # Ceiling is 10s away but leeway is 30s, so it's treated as already reached. + assert State.is_session_expiry_reached(now + 10) is True + + +@pytest.mark.asyncio +async def test_get_session_expired_by_ceiling_returns_none_and_deletes(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "id_token": "token123", + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + session_data = await client.get_session() + assert session_data is None + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_session_within_ceiling_ok(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "id_token": "token123", + "internal": {"sid": "some_sid", "created_at": now, "session_expires_at": now + 3600}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + session_data = await client.get_session() + assert session_data is not None + assert session_data["user"] == {"sub": "user123"} + mock_state_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_user_expired_by_ceiling_returns_none_and_deletes(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + user = await client.get_user() + assert user is None + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_user_no_ceiling_unaffected(): + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "user": {"sub": "user123"}, + "internal": {"sid": "some_sid", "created_at": int(time.time())}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + user = await client.get_user() + assert user == {"sub": "user123"} + mock_state_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_get_access_token_expired_by_ceiling_raises_without_refresh(mocker): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "refresh_token": "refresh_xyz", + "token_sets": [ + { + "audience": "default", + "access_token": "cached_token", + "expires_at": now + 500, # still valid, but ceiling overrides + } + ], + "internal": {"sid": "some_sid", "created_at": now - 100, "session_expires_at": now - 10}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + # If the refresh path is reached, that's a bug — make it explode. + refresh_spy = mocker.patch.object( + client, "get_token_by_refresh_token", new_callable=AsyncMock, + side_effect=AssertionError("refresh must not be attempted after ceiling"), + ) + + with pytest.raises(AccessTokenError) as exc: + await client.get_access_token() + + assert exc.value.code == AccessTokenErrorCode.SESSION_EXPIRED + refresh_spy.assert_not_awaited() + mock_state_store.delete.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_access_token_within_ceiling_serves_cached(): + now = int(time.time()) + mock_state_store = AsyncMock() + mock_state_store.get.return_value = { + "refresh_token": "refresh_xyz", + "token_sets": [ + { + "audience": "default", + "access_token": "cached_token", + "expires_at": now + 500, + } + ], + "internal": {"sid": "some_sid", "created_at": now, "session_expires_at": now + 3600}, + } + + client = ServerClient( + domain="auth0.local", + client_id="client_id", + client_secret="client_secret", + transaction_store=AsyncMock(), + state_store=mock_state_store, + secret="some-secret" + ) + + token = await client.get_access_token() + assert token == "cached_token" + mock_state_store.delete.assert_not_awaited() diff --git a/src/auth0_server_python/utils/helpers.py b/src/auth0_server_python/utils/helpers.py index 05cb0f8..11d8fbf 100644 --- a/src/auth0_server_python/utils/helpers.py +++ b/src/auth0_server_python/utils/helpers.py @@ -37,6 +37,48 @@ def generate_code_challenge(cls, code_verifier: str) -> str: class State: + # IPSIE session_expiry: clock-skew leeway (seconds). The session is treated + # as expired slightly *before* the wall-clock ceiling so the SDK never + # serves a session the Auth0 platform has already revoked. Per SDK Product + # Spec guidance. + SESSION_EXPIRY_LEEWAY_SECONDS = 30 + + @classmethod + def extract_session_expiry(cls, claims: Optional[dict[str, Any]]) -> Optional[int]: + """ + Read the IPSIE `session_expiry` claim (Unix seconds) from decoded ID + token claims. Returns None when absent or invalid so existing session + behavior is preserved (the feature is opt-in via the upstream + connection option). + + The IPSIE SL1 profile defines `session_expiry` as a JSON integer of + seconds since epoch. Non-integer or non-positive values are rejected + rather than trusted as a session ceiling. + """ + if not claims: + return None + value = claims.get("session_expiry") + if value is None: + return None + # bool is an int subclass — exclude it explicitly. + if isinstance(value, bool) or not isinstance(value, int): + return None + if value <= 0: + return None + return value + + @classmethod + def is_session_expiry_reached(cls, session_expires_at: Optional[int]) -> bool: + """ + True when the IPSIE session ceiling has been reached (applying negative + leeway for clock skew). None means no ceiling was asserted, so the + session is never expired on this basis. + """ + if session_expires_at is None: + return False + now = int(time.time()) + return now >= (session_expires_at - cls.SESSION_EXPIRY_LEEWAY_SECONDS) + @classmethod def update_state_data( cls, @@ -91,12 +133,20 @@ def update_state_data( else ts for ts in token_sets ] + # Preserve the IPSIE session_expiry ceiling stamped at login. The + # platform does not re-emit session_expiry on a refresh-token grant + # (it doesn't round-trip the upstream IdP), so the value from the + # refreshed ID token must NOT overwrite or erase the original + # ceiling — doing so would let the session outlive its bound. + internal = dict(state_data_dict.get("internal") or {}) + # Return updated state data return { **state_data_dict, "id_token": token_endpoint_response.get("id_token"), "refresh_token": token_endpoint_response.get("refresh_token") or state_data_dict.get("refresh_token"), - "token_sets": token_sets + "token_sets": token_sets, + "internal": internal } else: # Create completely new state data From ec4c45bdd38422fec6c8ae4e75da2562a0b3b648 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Thu, 11 Jun 2026 17:39:28 +0530 Subject: [PATCH 2/3] feat: reject already-expired sessions at login and generalize epoch claim extraction Add a login-time lockout guard to complete_interactive_login: when the upstream IdP asserts a session_expiry ceiling already in the past at login (compared against the ID token iat with the same 30s leeway as read-time enforcement), raise the new flow-agnostic SessionExpiredError instead of persisting an already-expired session. A missing claim stays a no-op, preserving existing behavior. Generalize extract_session_expiry into extract_epoch_claim(claims, name), reused for both session_expiry and iat, and rename the ceiling predicates to is_session_ceiling_reached (read-time) and is_session_ceiling_in_past (login). Document the login rejection in the README, RetrievingData guide, and the ipsie-webapp example. --- README.md | 2 +- examples/RetrievingData.md | 15 + .../auth_server/server_client.py | 25 +- src/auth0_server_python/error/__init__.py | 13 + .../tests/test_server_client.py | 294 ++++++++++++++++-- src/auth0_server_python/utils/helpers.py | 92 +++--- 6 files changed, 371 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 0c63cfe..364a7f6 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ For more details and examples, see [examples/MultipleCustomDomains.md](examples/ ### 6. Session Expiry from the Upstream IdP -For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. +For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. If the asserted ceiling is already in the past at login, `complete_interactive_login()` raises a `SessionExpiredError` instead of persisting an already-expired session. For more details and examples, see [examples/RetrievingData.md](examples/RetrievingData.md#session-expiry-from-the-upstream-idp). diff --git a/examples/RetrievingData.md b/examples/RetrievingData.md index 221c1a8..dca81ba 100644 --- a/examples/RetrievingData.md +++ b/examples/RetrievingData.md @@ -93,6 +93,21 @@ except AccessTokenError as error: When the ceiling is reached, the SDK deletes the stored session before returning, so the next request starts clean. +If the upstream IdP asserts a ceiling that is already in the past at login time, `complete_interactive_login()` raises a `SessionExpiredError` rather than persisting an already-expired session: + +```python +from auth0_server_python.error import SessionExpiredError + +try: + await server_client.complete_interactive_login(url, store_options=store_options) +except SessionExpiredError: + # The session was already past its ceiling on arrival; start a new login. + ... +``` + +> [!NOTE] +> **Upgrading:** with this feature enabled, `get_user()` and `get_session()` can return `None` for a user who was previously logged in, once the upstream ceiling passes. Applications that assumed these always return a value after login should add a null check and route the user back through login. + The `session_expiry` value is also surfaced through the user claims, so you can read it without triggering enforcement: ```python diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 0ba698b..08b80c8 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -55,6 +55,7 @@ MissingRequiredArgumentError, MissingTransactionError, PollingApiError, + SessionExpiredError, StartLinkUserError, ) from auth0_server_python.telemetry import Telemetry @@ -640,12 +641,15 @@ async def complete_interactive_login( id_token = token_response.get("id_token") # IPSIE session_expiry ceiling, read from the verified ID token claims. session_expires_at = None + # ID token `iat`, used to detect a ceiling that is already past at login. + issued_at = None if user_info: user_claims = UserClaims.parse_obj(user_info) # authlib populates `userinfo` from parsed ID token claims, so the # IPSIE session_expiry claim may surface here. - session_expires_at = State.extract_session_expiry(user_info) + session_expires_at = State.extract_epoch_claim(user_info, "session_expiry") + issued_at = State.extract_epoch_claim(user_info, "iat") elif id_token: # Fetch JWKS for signature verification jwks = await self._get_jwks_cached(origin_domain, metadata) @@ -663,7 +667,8 @@ async def complete_interactive_login( user_claims = UserClaims.parse_obj(claims) # IPSIE session_expiry ceiling from the verified ID token. - session_expires_at = State.extract_session_expiry(claims) + session_expires_at = State.extract_epoch_claim(claims, "session_expiry") + issued_at = State.extract_epoch_claim(claims, "iat") except ValueError as e: raise ApiError("jwks_key_not_found", str(e)) except jwt.InvalidSignatureError as e: @@ -692,6 +697,10 @@ async def complete_interactive_login( ) + # Refuse to persist a session whose ceiling is already in the past. + if State.is_session_ceiling_in_past(session_expires_at, issued_at): + raise SessionExpiredError() + # Build a token set using the token response data token_set = TokenSet( audience=transaction_data.audience or self.DEFAULT_AUDIENCE_STATE_KEY, @@ -754,7 +763,7 @@ async def _is_session_expired_by_ceiling( """ internal = state_data_dict.get("internal") or {} session_expires_at = internal.get("session_expires_at") - if State.is_session_expiry_reached(session_expires_at): + if State.is_session_ceiling_reached(session_expires_at): await self._state_store.delete(self._state_identifier, options=store_options) return True return False @@ -1005,17 +1014,13 @@ async def get_access_token( merged_scope = self._merge_scope_with_defaults(scope, audience) - # IPSIE: once the upstream IdP session ceiling has passed, the session - # is expired. Surface "session expired" and do NOT serve a cached token - # or attempt a refresh-token exchange (which would race the platform's - # session revocation). + # Once the session ceiling has passed, fail instead of serving or refreshing a token. internal = (state_data_dict or {}).get("internal") or {} - if State.is_session_expiry_reached(internal.get("session_expires_at")): + if State.is_session_ceiling_reached(internal.get("session_expires_at")): await self._state_store.delete(self._state_identifier, options=store_options) raise AccessTokenError( AccessTokenErrorCode.SESSION_EXPIRED, - "The session has expired because the upstream identity provider's " - "session_expiry was reached. The user needs to re-authenticate." + "The session has expired and the user must re-authenticate." ) # Find matching token set diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index 9de7615..d7a6f03 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -211,6 +211,19 @@ class AccessTokenForConnectionErrorCode: DOMAIN_MISMATCH = "domain_mismatch" +class SessionExpiredError(Auth0Error): + """ + Error raised when a session is rejected at login because its + session_expiry ceiling is already in the past. + """ + code = AccessTokenErrorCode.SESSION_EXPIRED + + def __init__(self, message: Optional[str] = None, cause=None): + super().__init__(message or "The session has expired and the user must re-authenticate.") + self.name = "SessionExpiredError" + self.cause = cause + + class CustomTokenExchangeError(Auth0Error): """ Error raised during custom token exchange operations. diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 19ad3ee..bb57901 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -44,6 +44,7 @@ MissingRequiredArgumentError, MissingTransactionError, PollingApiError, + SessionExpiredError, StartLinkUserError, ) from auth0_server_python.utils import PKCE, State @@ -4823,41 +4824,87 @@ async def _fake_fetch(self, domain): # ============================================================================= -def test_extract_session_expiry_valid(): - assert State.extract_session_expiry({"session_expiry": 1893456000}) == 1893456000 +def test_extract_epoch_claim_valid(): + assert State.extract_epoch_claim({"session_expiry": 1893456000}, "session_expiry") == 1893456000 -def test_extract_session_expiry_absent_or_empty(): - assert State.extract_session_expiry(None) is None - assert State.extract_session_expiry({}) is None - assert State.extract_session_expiry({"session_expiry": None}) is None +def test_extract_epoch_claim_reused_for_iat(): + # Same extractor/validator serves any Unix-seconds claim, e.g. iat. + assert State.extract_epoch_claim({"iat": 1893456000}, "iat") == 1893456000 -def test_extract_session_expiry_rejects_non_int_and_non_positive(): +def test_extract_epoch_claim_absent_or_empty(): + assert State.extract_epoch_claim(None, "session_expiry") is None + assert State.extract_epoch_claim({}, "session_expiry") is None + assert State.extract_epoch_claim({"session_expiry": None}, "session_expiry") is None + + +def test_extract_epoch_claim_rejects_non_int_and_non_positive(): # bool is an int subclass but must be rejected - assert State.extract_session_expiry({"session_expiry": True}) is None - assert State.extract_session_expiry({"session_expiry": "1893456000"}) is None - assert State.extract_session_expiry({"session_expiry": 1893456000.0}) is None - assert State.extract_session_expiry({"session_expiry": 0}) is None - assert State.extract_session_expiry({"session_expiry": -5}) is None + assert State.extract_epoch_claim({"session_expiry": True}, "session_expiry") is None + assert State.extract_epoch_claim({"session_expiry": "1893456000"}, "session_expiry") is None + assert State.extract_epoch_claim({"session_expiry": 1893456000.0}, "session_expiry") is None + assert State.extract_epoch_claim({"session_expiry": 0}, "session_expiry") is None + assert State.extract_epoch_claim({"session_expiry": -5}, "session_expiry") is None -def test_is_session_expiry_reached_none_never_expires(): - assert State.is_session_expiry_reached(None) is False +def test_is_session_ceiling_reached_none_never_expires(): + assert State.is_session_ceiling_reached(None) is False -def test_is_session_expiry_reached_future_and_past(): +def test_is_session_ceiling_reached_future_and_past(): now = int(time.time()) # Comfortably in the future (beyond the leeway window) -> not reached. - assert State.is_session_expiry_reached(now + 3600) is False + assert State.is_session_ceiling_reached(now + 3600) is False # In the past -> reached. - assert State.is_session_expiry_reached(now - 10) is True + assert State.is_session_ceiling_reached(now - 10) is True -def test_is_session_expiry_reached_applies_negative_leeway(): +def test_is_session_ceiling_reached_applies_negative_leeway(): now = int(time.time()) # Ceiling is 10s away but leeway is 30s, so it's treated as already reached. - assert State.is_session_expiry_reached(now + 10) is True + assert State.is_session_ceiling_reached(now + 10) is True + + +def test_is_session_ceiling_in_past_none_is_safe_default(): + # No ceiling asserted -> never treated as expired. + assert State.is_session_ceiling_in_past(None, 1893456000) is False + assert State.is_session_ceiling_in_past(None, None) is False + + +def test_is_session_ceiling_in_past_past_ceiling_relative_to_iat(): + iat = 1893456000 + # Ceiling well before iat -> already lapsed at login. + assert State.is_session_ceiling_in_past(iat - 3600, iat) is True + + +def test_is_session_ceiling_in_past_future_ceiling_relative_to_iat(): + iat = 1893456000 + # Ceiling well after iat -> not lapsed. + assert State.is_session_ceiling_in_past(iat + 3600, iat) is False + + +def test_is_session_ceiling_in_past_falls_back_to_now_when_iat_absent(): + now = int(time.time()) + # No iat -> compare against wall-clock now; a past ceiling is lapsed. + assert State.is_session_ceiling_in_past(now - 100, None) is True + + +def test_is_session_ceiling_in_past_leeway_boundary(): + iat = 1893456000 + leeway = State.SESSION_EXPIRY_LEEWAY_SECONDS + # Ceiling exactly at iat + leeway is treated as already lapsed... + assert State.is_session_ceiling_in_past(iat + leeway, iat) is True + # ...one second beyond the leeway window is not. + assert State.is_session_ceiling_in_past(iat + leeway + 1, iat) is False + + +def test_session_expired_error_message_is_generic(): + message = str(SessionExpiredError()) + # States the reason without leaking any timestamps or values. + assert message == "The session has expired and the user must re-authenticate." + assert not any(ch.isdigit() for ch in message) + assert SessionExpiredError().code == AccessTokenErrorCode.SESSION_EXPIRED @pytest.mark.asyncio @@ -5021,3 +5068,212 @@ async def test_get_access_token_within_ceiling_serves_cached(): token = await client.get_access_token() assert token == "cached_token" mock_state_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_complete_interactive_login_rejects_already_expired_ceiling(mocker): + """A session_expiry already in the past at login is rejected, not persisted.""" + iat = int(time.time()) + + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="123", + domain="tenant.auth0.com", + ) + mock_state_store = AsyncMock() + + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + + # Mock OIDC metadata + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"issuer": "https://tenant.auth0.com/", "token_endpoint": "https://tenant.auth0.com/token"} + ) + + # Mock JWKS fetch + mocker.patch.object( + client, + "_get_jwks_cached", + return_value={"keys": [{"kty": "RSA", "kid": "test-key"}]} + ) + + # Mock OAuth fetch_token + async_fetch_token = AsyncMock() + async_fetch_token.return_value = { + "access_token": "token123", + "id_token": "id_token_jwt", + "scope": "openid profile" + } + mocker.patch.object(client._oauth, "fetch_token", async_fetch_token) + + # Mock jwt.get_unverified_header + mocker.patch("jwt.get_unverified_header", return_value={"kid": "test-key"}) + + # Mock PyJWK.from_dict + mock_signing_key = mocker.MagicMock() + mock_signing_key.key = "mock_pem_key" + mocker.patch("jwt.PyJWK.from_dict", return_value=mock_signing_key) + + # Mock jwt.decode with a ceiling already in the past relative to iat + mocker.patch("jwt.decode", return_value={ + "sub": "user123", + "iss": "https://tenant.auth0.com/", + "aud": "test_client", + "iat": iat, + "session_expiry": iat - 3600, + }) + + with pytest.raises(SessionExpiredError) as exc: + await client.complete_interactive_login("http://localhost/callback?code=abc&state=xyz") + + assert exc.value.code == AccessTokenErrorCode.SESSION_EXPIRED + # The already-expired session must never be persisted, and the transaction is + # left intact so the user can retry a fresh login. + mock_state_store.set.assert_not_awaited() + mock_tx_store.delete.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_complete_interactive_login_future_ceiling_persists(mocker): + """A future session_expiry is stamped on the session and login succeeds.""" + iat = int(time.time()) + ceiling = iat + 3600 + + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="123", + domain="tenant.auth0.com", + ) + mock_state_store = AsyncMock() + + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + + # Mock OIDC metadata + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"issuer": "https://tenant.auth0.com/", "token_endpoint": "https://tenant.auth0.com/token"} + ) + + # Mock JWKS fetch + mocker.patch.object( + client, + "_get_jwks_cached", + return_value={"keys": [{"kty": "RSA", "kid": "test-key"}]} + ) + + # Mock OAuth fetch_token + async_fetch_token = AsyncMock() + async_fetch_token.return_value = { + "access_token": "token123", + "id_token": "id_token_jwt", + "scope": "openid profile" + } + mocker.patch.object(client._oauth, "fetch_token", async_fetch_token) + + # Mock jwt.get_unverified_header + mocker.patch("jwt.get_unverified_header", return_value={"kid": "test-key"}) + + # Mock PyJWK.from_dict + mock_signing_key = mocker.MagicMock() + mock_signing_key.key = "mock_pem_key" + mocker.patch("jwt.PyJWK.from_dict", return_value=mock_signing_key) + + # Mock jwt.decode with a ceiling comfortably in the future + mocker.patch("jwt.decode", return_value={ + "sub": "user123", + "iss": "https://tenant.auth0.com/", + "aud": "test_client", + "iat": iat, + "session_expiry": ceiling, + }) + + result = await client.complete_interactive_login("http://localhost/callback?code=abc&state=xyz") + + assert "state_data" in result + mock_state_store.set.assert_awaited_once() + stored_state = mock_state_store.set.call_args.args[1] + assert stored_state.internal.session_expires_at == ceiling + + +@pytest.mark.asyncio +async def test_complete_interactive_login_no_ceiling_persists_normally(mocker): + """No session_expiry claim -> login behaves exactly as before (no ceiling).""" + iat = int(time.time()) + + mock_tx_store = AsyncMock() + mock_tx_store.get.return_value = TransactionData( + code_verifier="123", + domain="tenant.auth0.com", + ) + mock_state_store = AsyncMock() + + client = ServerClient( + domain="tenant.auth0.com", + client_id="test_client", + client_secret="test_secret", + transaction_store=mock_tx_store, + state_store=mock_state_store, + secret="test_secret_key_32_chars_long!!", + ) + + # Mock OIDC metadata + mocker.patch.object( + client, + "_get_oidc_metadata_cached", + return_value={"issuer": "https://tenant.auth0.com/", "token_endpoint": "https://tenant.auth0.com/token"} + ) + + # Mock JWKS fetch + mocker.patch.object( + client, + "_get_jwks_cached", + return_value={"keys": [{"kty": "RSA", "kid": "test-key"}]} + ) + + # Mock OAuth fetch_token + async_fetch_token = AsyncMock() + async_fetch_token.return_value = { + "access_token": "token123", + "id_token": "id_token_jwt", + "scope": "openid profile" + } + mocker.patch.object(client._oauth, "fetch_token", async_fetch_token) + + # Mock jwt.get_unverified_header + mocker.patch("jwt.get_unverified_header", return_value={"kid": "test-key"}) + + # Mock PyJWK.from_dict + mock_signing_key = mocker.MagicMock() + mock_signing_key.key = "mock_pem_key" + mocker.patch("jwt.PyJWK.from_dict", return_value=mock_signing_key) + + # Mock jwt.decode without a session_expiry claim + mocker.patch("jwt.decode", return_value={ + "sub": "user123", + "iss": "https://tenant.auth0.com/", + "aud": "test_client", + "iat": iat, + }) + + result = await client.complete_interactive_login("http://localhost/callback?code=abc&state=xyz") + + assert "state_data" in result + mock_state_store.set.assert_awaited_once() + stored_state = mock_state_store.set.call_args.args[1] + assert stored_state.internal.session_expires_at is None diff --git a/src/auth0_server_python/utils/helpers.py b/src/auth0_server_python/utils/helpers.py index 11d8fbf..9c5b6f5 100644 --- a/src/auth0_server_python/utils/helpers.py +++ b/src/auth0_server_python/utils/helpers.py @@ -37,48 +37,10 @@ def generate_code_challenge(cls, code_verifier: str) -> str: class State: - # IPSIE session_expiry: clock-skew leeway (seconds). The session is treated - # as expired slightly *before* the wall-clock ceiling so the SDK never - # serves a session the Auth0 platform has already revoked. Per SDK Product - # Spec guidance. + # Clock-skew leeway (seconds): treat the session as expired slightly before + # the ceiling so the SDK never serves a session the platform has revoked. SESSION_EXPIRY_LEEWAY_SECONDS = 30 - @classmethod - def extract_session_expiry(cls, claims: Optional[dict[str, Any]]) -> Optional[int]: - """ - Read the IPSIE `session_expiry` claim (Unix seconds) from decoded ID - token claims. Returns None when absent or invalid so existing session - behavior is preserved (the feature is opt-in via the upstream - connection option). - - The IPSIE SL1 profile defines `session_expiry` as a JSON integer of - seconds since epoch. Non-integer or non-positive values are rejected - rather than trusted as a session ceiling. - """ - if not claims: - return None - value = claims.get("session_expiry") - if value is None: - return None - # bool is an int subclass — exclude it explicitly. - if isinstance(value, bool) or not isinstance(value, int): - return None - if value <= 0: - return None - return value - - @classmethod - def is_session_expiry_reached(cls, session_expires_at: Optional[int]) -> bool: - """ - True when the IPSIE session ceiling has been reached (applying negative - leeway for clock skew). None means no ceiling was asserted, so the - session is never expired on this basis. - """ - if session_expires_at is None: - return False - now = int(time.time()) - return now >= (session_expires_at - cls.SESSION_EXPIRY_LEEWAY_SECONDS) - @classmethod def update_state_data( cls, @@ -227,6 +189,56 @@ def update_state_data_for_connection_token_set( "connection_token_sets": connection_token_sets } + @classmethod + def extract_epoch_claim(cls, claims: Optional[dict[str, Any]], name: str) -> Optional[int]: + """ + Read a Unix-seconds claim (e.g. `session_expiry`, `iat`) from decoded ID + token claims, or None when absent or invalid. + + The value must be a positive JSON integer; non-integer or non-positive + values are rejected rather than trusted. + """ + if not claims: + return None + value = claims.get(name) + if value is None: + return None + # bool is an int subclass — exclude it explicitly. + if isinstance(value, bool) or not isinstance(value, int): + return None + if value <= 0: + return None + return value + + @classmethod + def is_session_ceiling_reached(cls, session_expires_at: Optional[int]) -> bool: + """ + True when the session ceiling has been reached (applying negative + leeway for clock skew). None means no ceiling was asserted, so the + session is never expired on this basis. + """ + if session_expires_at is None: + return False + now = int(time.time()) + return now >= (session_expires_at - cls.SESSION_EXPIRY_LEEWAY_SECONDS) + + @classmethod + def is_session_ceiling_in_past( + cls, session_expires_at: Optional[int], issued_at: Optional[int] = None + ) -> bool: + """ + True when the session ceiling is already in the past at login. + + Compares the ceiling against the ID token `iat`, or wall-clock now when + `iat` is absent, using the same leeway as is_session_ceiling_reached. A + None ceiling means none was asserted and is never treated as expired. + """ + if session_expires_at is None: + return False + reference = issued_at if issued_at else int(time.time()) + return session_expires_at <= (reference + cls.SESSION_EXPIRY_LEEWAY_SECONDS) + + class URL: @staticmethod def build_url(base_url: str, params: dict[str, Any]) -> str: From a1c5d5ea540c44596ebd6a50cc47d7f6f231cfab Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Thu, 11 Jun 2026 22:29:18 +0530 Subject: [PATCH 3/3] refactor: read session_expiry/iat as raw claims and trust the platform boundary The signature-verified, Auth0-issued ID token has already been validated upstream (the platform refuses to emit a malformed session_expiry), so the SDK reads it like every other operational claim instead of running a bespoke validator. Removes State.extract_epoch_claim and reads session_expiry/iat with a plain .get() at both extraction sites; the None guards in the ceiling comparison functions still deliver the absent/null "no ceiling" safe default. Production-reachable inputs (absent/null, clean integer) are unchanged; only unreachable malformed values change behavior, now failing closed rather than being silently accepted. --- .../auth_server/server_client.py | 8 +++---- .../tests/test_server_client.py | 24 ------------------- src/auth0_server_python/utils/helpers.py | 21 ---------------- 3 files changed, 4 insertions(+), 49 deletions(-) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index 08b80c8..ab3cbbe 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -648,8 +648,8 @@ async def complete_interactive_login( user_claims = UserClaims.parse_obj(user_info) # authlib populates `userinfo` from parsed ID token claims, so the # IPSIE session_expiry claim may surface here. - session_expires_at = State.extract_epoch_claim(user_info, "session_expiry") - issued_at = State.extract_epoch_claim(user_info, "iat") + session_expires_at = user_info.get("session_expiry") + issued_at = user_info.get("iat") elif id_token: # Fetch JWKS for signature verification jwks = await self._get_jwks_cached(origin_domain, metadata) @@ -667,8 +667,8 @@ async def complete_interactive_login( user_claims = UserClaims.parse_obj(claims) # IPSIE session_expiry ceiling from the verified ID token. - session_expires_at = State.extract_epoch_claim(claims, "session_expiry") - issued_at = State.extract_epoch_claim(claims, "iat") + session_expires_at = claims.get("session_expiry") + issued_at = claims.get("iat") except ValueError as e: raise ApiError("jwks_key_not_found", str(e)) except jwt.InvalidSignatureError as e: diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index bb57901..ee10d19 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -4824,30 +4824,6 @@ async def _fake_fetch(self, domain): # ============================================================================= -def test_extract_epoch_claim_valid(): - assert State.extract_epoch_claim({"session_expiry": 1893456000}, "session_expiry") == 1893456000 - - -def test_extract_epoch_claim_reused_for_iat(): - # Same extractor/validator serves any Unix-seconds claim, e.g. iat. - assert State.extract_epoch_claim({"iat": 1893456000}, "iat") == 1893456000 - - -def test_extract_epoch_claim_absent_or_empty(): - assert State.extract_epoch_claim(None, "session_expiry") is None - assert State.extract_epoch_claim({}, "session_expiry") is None - assert State.extract_epoch_claim({"session_expiry": None}, "session_expiry") is None - - -def test_extract_epoch_claim_rejects_non_int_and_non_positive(): - # bool is an int subclass but must be rejected - assert State.extract_epoch_claim({"session_expiry": True}, "session_expiry") is None - assert State.extract_epoch_claim({"session_expiry": "1893456000"}, "session_expiry") is None - assert State.extract_epoch_claim({"session_expiry": 1893456000.0}, "session_expiry") is None - assert State.extract_epoch_claim({"session_expiry": 0}, "session_expiry") is None - assert State.extract_epoch_claim({"session_expiry": -5}, "session_expiry") is None - - def test_is_session_ceiling_reached_none_never_expires(): assert State.is_session_ceiling_reached(None) is False diff --git a/src/auth0_server_python/utils/helpers.py b/src/auth0_server_python/utils/helpers.py index 9c5b6f5..70aa260 100644 --- a/src/auth0_server_python/utils/helpers.py +++ b/src/auth0_server_python/utils/helpers.py @@ -189,27 +189,6 @@ def update_state_data_for_connection_token_set( "connection_token_sets": connection_token_sets } - @classmethod - def extract_epoch_claim(cls, claims: Optional[dict[str, Any]], name: str) -> Optional[int]: - """ - Read a Unix-seconds claim (e.g. `session_expiry`, `iat`) from decoded ID - token claims, or None when absent or invalid. - - The value must be a positive JSON integer; non-integer or non-positive - values are rejected rather than trusted. - """ - if not claims: - return None - value = claims.get(name) - if value is None: - return None - # bool is an int subclass — exclude it explicitly. - if isinstance(value, bool) or not isinstance(value, int): - return None - if value <= 0: - return None - return value - @classmethod def is_session_ceiling_reached(cls, session_expires_at: Optional[int]) -> bool: """