Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ The SDK handles per-domain OIDC discovery, JWKS fetching, issuer validation, and

For more details and examples, see [examples/MultipleCustomDomains.md](examples/MultipleCustomDomains.md).

### 6. Session Expiry from the Upstream IdP

For enterprise connections, the upstream identity provider can cap how long a user's session lives. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim in the ID token, and the SDK enforces this ceiling on every session read. Once it is reached, `get_user()` and `get_session()` return `None`, and `get_access_token()` raises an `AccessTokenError` with code `session_expired`. If the asserted ceiling is already in the past at login, `complete_interactive_login()` raises a `SessionExpiredError` instead of persisting an already-expired session.

For more details and examples, see [examples/RetrievingData.md](examples/RetrievingData.md#session-expiry-from-the-upstream-idp).

## Feedback

### Contributing
Expand Down
48 changes: 48 additions & 0 deletions examples/RetrievingData.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,54 @@ access_token = await server_client.get_access_token(store_options=store_options)

Read more above in [Configuring the Store](./ConfigureStore.md).

## Session Expiry from the Upstream IdP

For enterprise connections, the upstream identity provider can impose a ceiling on how long the user's session may live. When the connection is configured to honor it, Auth0 includes a `session_expiry` claim (an absolute Unix timestamp, in seconds) in the ID token. The SDK reads this value at login, stores it with the session, and enforces it on every subsequent read.

Once the ceiling is reached, the read methods behave as follows:

- `get_user()` returns `None`, as if no session exists.
- `get_session()` returns `None`, as if no session exists.
- `get_access_token()` raises an `AccessTokenError` with code `session_expired`.

```python
from auth0_server_python.error import AccessTokenError, AccessTokenErrorCode

try:
access_token = await server_client.get_access_token(store_options=store_options)
except AccessTokenError as error:
if error.code == AccessTokenErrorCode.SESSION_EXPIRED:
# The upstream session ceiling has been reached; start a new login.
...
```

When the ceiling is reached, the SDK deletes the stored session before returning, so the next request starts clean.

If the upstream IdP asserts a ceiling that is already in the past at login time, `complete_interactive_login()` raises a `SessionExpiredError` rather than persisting an already-expired session:

```python
from auth0_server_python.error import SessionExpiredError

try:
await server_client.complete_interactive_login(url, store_options=store_options)
except SessionExpiredError:
# The session was already past its ceiling on arrival; start a new login.
...
```

> [!NOTE]
> **Upgrading:** with this feature enabled, `get_user()` and `get_session()` can return `None` for a user who was previously logged in, once the upstream ceiling passes. Applications that assumed these always return a value after login should add a null check and route the user back through login.

The `session_expiry` value is also surfaced through the user claims, so you can read it without triggering enforcement:

```python
user = await server_client.get_user(store_options=store_options)
session_expires_at = (user or {}).get("session_expiry")
```

> [!NOTE]
> Enforcement applies a small negative leeway (30 seconds) to account for clock skew, so a session is treated as expired slightly before the exact `session_expiry` timestamp. The refresh-token grant preserves the original ceiling - refreshing an access token does not extend the upstream session.

## Multi-Resource Refresh Tokens (MRRT)

Multi-Resource Refresh Tokens allow using a single refresh token to obtain access tokens for multiple audiences, simplifying token management in applications that interact with multiple backend services.
Expand Down
53 changes: 52 additions & 1 deletion src/auth0_server_python/auth_server/server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
MissingRequiredArgumentError,
MissingTransactionError,
PollingApiError,
SessionExpiredError,
StartLinkUserError,
)
from auth0_server_python.telemetry import Telemetry
Expand Down Expand Up @@ -638,9 +639,17 @@ async def complete_interactive_login(
user_info = token_response.get("userinfo")
user_claims = None
id_token = token_response.get("id_token")
# IPSIE session_expiry ceiling, read from the verified ID token claims.
session_expires_at = None
# ID token `iat`, used to detect a ceiling that is already past at login.
issued_at = None

if user_info:
user_claims = UserClaims.parse_obj(user_info)
# authlib populates `userinfo` from parsed ID token claims, so the
# IPSIE session_expiry claim may surface here.
session_expires_at = user_info.get("session_expiry")
issued_at = user_info.get("iat")
elif id_token:
# Fetch JWKS for signature verification
jwks = await self._get_jwks_cached(origin_domain, metadata)
Expand All @@ -657,6 +666,9 @@ async def complete_interactive_login(
raise IssuerValidationError("ID token issuer mismatch. Ensure your Auth0 domain is configured correctly.")

user_claims = UserClaims.parse_obj(claims)
# IPSIE session_expiry ceiling from the verified ID token.
session_expires_at = claims.get("session_expiry")
issued_at = claims.get("iat")
except ValueError as e:
raise ApiError("jwks_key_not_found", str(e))
except jwt.InvalidSignatureError as e:
Expand Down Expand Up @@ -685,6 +697,10 @@ async def complete_interactive_login(
)


# Refuse to persist a session whose ceiling is already in the past.
if State.is_session_ceiling_in_past(session_expires_at, issued_at):
raise SessionExpiredError()

# Build a token set using the token response data
token_set = TokenSet(
audience=transaction_data.audience or self.DEFAULT_AUDIENCE_STATE_KEY,
Expand All @@ -708,7 +724,8 @@ async def complete_interactive_login(
domain=origin_domain,
internal={
"sid": sid,
"created_at": int(time.time())
"created_at": int(time.time()),
"session_expires_at": session_expires_at
}
)

Expand All @@ -734,6 +751,23 @@ async def complete_interactive_login(
# Methods for retrieving user information, session data, and logout operations.
# ============================================================================

async def _is_session_expired_by_ceiling(
self, state_data_dict: dict, store_options: Optional[dict[str, Any]] = None
) -> bool:
"""
Enforce the IPSIE session_expiry ceiling on a session read.

Returns True (and deletes the stored session) when the upstream
IdP-asserted ceiling has been reached. Sessions without a
session_expires_at value are never expired on this basis.
"""
internal = state_data_dict.get("internal") or {}
session_expires_at = internal.get("session_expires_at")
if State.is_session_ceiling_reached(session_expires_at):
await self._state_store.delete(self._state_identifier, options=store_options)
return True
return False

async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Optional[dict[str, Any]]:
"""
Retrieves the user from the store, or None if no user found.
Expand All @@ -760,6 +794,10 @@ async def get_user(self, store_options: Optional[dict[str, Any]] = None) -> Opti
if self._normalize_url(session_domain) != self._normalize_url(current_domain):
return None

# IPSIE: force re-auth once the upstream IdP session ceiling passes.
if await self._is_session_expired_by_ceiling(state_data, store_options):
return None

return state_data.get("user")
return None

Expand Down Expand Up @@ -789,6 +827,10 @@ async def get_session(self, store_options: Optional[dict[str, Any]] = None) -> O
if self._normalize_url(session_domain) != self._normalize_url(current_domain):
return None

# IPSIE: force re-auth once the upstream IdP session ceiling passes.
if await self._is_session_expired_by_ceiling(state_data, store_options):
return None

session_data = {k: v for k, v in state_data.items()
if k != "internal"}
return session_data
Expand Down Expand Up @@ -972,6 +1014,15 @@ async def get_access_token(

merged_scope = self._merge_scope_with_defaults(scope, audience)

# Once the session ceiling has passed, fail instead of serving or refreshing a token.
internal = (state_data_dict or {}).get("internal") or {}
if State.is_session_ceiling_reached(internal.get("session_expires_at")):
await self._state_store.delete(self._state_identifier, options=store_options)
raise AccessTokenError(
AccessTokenErrorCode.SESSION_EXPIRED,
"The session has expired and the user must re-authenticate."
)

# Find matching token set
token_set = None
if state_data_dict and "token_sets" in state_data_dict:
Expand Down
6 changes: 6 additions & 0 deletions src/auth0_server_python/auth_types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class UserClaims(BaseModel):
email: Optional[str] = None
email_verified: Optional[bool] = None
org_id: Optional[str] = None
# IPSIE SL1 claim: upstream IdP-asserted RP session ceiling (Unix seconds).
session_expiry: Optional[int] = None

class Config:
extra = "allow" # Allow additional fields not defined in the model
Expand Down Expand Up @@ -54,6 +56,10 @@ class InternalStateData(BaseModel):
"""
sid: str
created_at: int
# IPSIE session_expiry ceiling (Unix seconds), stamped at session creation
# from the ID token's session_expiry claim. None when the upstream IdP did
# not assert one — in which case existing session behavior is unchanged.
session_expires_at: Optional[int] = None


class SessionData(BaseModel):
Expand Down
14 changes: 14 additions & 0 deletions src/auth0_server_python/error/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ class AccessTokenErrorCode:
INCORRECT_AUDIENCE = "incorrect_audience"
MISSING_SESSION_DOMAIN = "missing_session_domain"
DOMAIN_MISMATCH = "domain_mismatch"
SESSION_EXPIRED = "session_expired"


class AccessTokenForConnectionErrorCode:
Expand All @@ -210,6 +211,19 @@ class AccessTokenForConnectionErrorCode:
DOMAIN_MISMATCH = "domain_mismatch"


class SessionExpiredError(Auth0Error):
"""
Error raised when a session is rejected at login because its
session_expiry ceiling is already in the past.
"""
code = AccessTokenErrorCode.SESSION_EXPIRED

def __init__(self, message: Optional[str] = None, cause=None):
super().__init__(message or "The session has expired and the user must re-authenticate.")
self.name = "SessionExpiredError"
self.cause = cause


class CustomTokenExchangeError(Auth0Error):
"""
Error raised during custom token exchange operations.
Expand Down
Loading
Loading