From 3ce3ef2800bb0ae52735da2e06822a0b20c747e5 Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Wed, 11 Jun 2025 14:31:44 +0200 Subject: [PATCH 1/8] Enhance session token validation by adding support for InvalidAlgorithmError and updating error handling for token decoding. Update tests to cover disallowed algorithms. --- .../services/implementation/session_service.py | 18 +++++++++++++----- tests/unit/test_session_service.py | 11 +++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/corbado_python_sdk/services/implementation/session_service.py b/src/corbado_python_sdk/services/implementation/session_service.py index f752ba5..a70855c 100644 --- a/src/corbado_python_sdk/services/implementation/session_service.py +++ b/src/corbado_python_sdk/services/implementation/session_service.py @@ -3,6 +3,7 @@ ExpiredSignatureError, ImmatureSignatureError, InvalidSignatureError, + InvalidAlgorithmError, decode, ) from jwt.jwks_client import PyJWKClient @@ -16,6 +17,7 @@ ) DEFAULT_SESSION_TOKEN_LENGTH = 300 +ALLOWED_ALGS = {"RS256"} class SessionService(BaseModel): @@ -90,7 +92,7 @@ def validate_token(self, session_token: StrictStr) -> UserEntity: # decode short session (jwt) with signing key try: - payload = decode(jwt=session_token, key=signing_key.key, algorithms=["RS256"]) + payload = decode(jwt=session_token, key=signing_key.key, algorithms=list(ALLOWED_ALGS)) # extract information from decoded payload token_issuer: str = payload.get("iss") @@ -104,15 +106,21 @@ def validate_token(self, session_token: StrictStr) -> UserEntity: ) except ExpiredSignatureError as error: raise TokenValidationException( - error_type=ValidationErrorType.CODE_JWT_INVALID_SIGNATURE, - message=f"Error occured during token decode: {session_token}. {ValidationErrorType.CODE_JWT_INVALID_SIGNATURE.value}", + error_type=ValidationErrorType.CODE_JWT_EXPIRED, + message=f"Error occured during token decode: {session_token}. {ValidationErrorType.CODE_JWT_EXPIRED.value}", original_exception=error, ) except InvalidSignatureError as error: raise TokenValidationException( - error_type=ValidationErrorType.CODE_JWT_EXPIRED, - message=f"Error occured during token decode: {session_token}. {ValidationErrorType.CODE_JWT_EXPIRED.value}", + error_type=ValidationErrorType.CODE_JWT_INVALID_SIGNATURE, + message=f"Error occured during token decode: {session_token}. {ValidationErrorType.CODE_JWT_INVALID_SIGNATURE.value}", + original_exception=error, + ) + except InvalidAlgorithmError as error: + raise TokenValidationException( + error_type=ValidationErrorType.CODE_JWT_INVALID_SIGNATURE, + message="Algorithm not allowed", original_exception=error, ) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index 6c8122e..6261cb7 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -9,6 +9,7 @@ ExpiredSignatureError, ImmatureSignatureError, InvalidSignatureError, + InvalidAlgorithmError, PyJWKClientError, encode, ) @@ -193,6 +194,16 @@ def _provide_jwts(self): None, None, ), + # Disallowed algorithm "none" + ( + False, + "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0." + "eyJpc3MiOiJodHRwczovL2F1dGguYWNtZS5jb20iLCJzdWIiOiIxMjM0NSIsImlhdCI6" + + str(int(time())) + + "f.", + InvalidAlgorithmError, + "Algorithm not allowed", + ), ] @classmethod From a44b343cdf1363c64517fb8d49e6d61cf873f79a Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Wed, 11 Jun 2025 14:38:12 +0200 Subject: [PATCH 2/8] fixes lnting --- .../services/implementation/session_service.py | 2 +- tests/unit/test_session_service.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/corbado_python_sdk/services/implementation/session_service.py b/src/corbado_python_sdk/services/implementation/session_service.py index a70855c..7db7a04 100644 --- a/src/corbado_python_sdk/services/implementation/session_service.py +++ b/src/corbado_python_sdk/services/implementation/session_service.py @@ -2,8 +2,8 @@ from jwt import ( ExpiredSignatureError, ImmatureSignatureError, - InvalidSignatureError, InvalidAlgorithmError, + InvalidSignatureError, decode, ) from jwt.jwks_client import PyJWKClient diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index 6261cb7..f738127 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -8,8 +8,8 @@ DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidSignatureError, InvalidAlgorithmError, + InvalidSignatureError, PyJWKClientError, encode, ) From 9fc73bd78ad2b5aa5a98a1db10d6f877f3e7473e Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Wed, 11 Jun 2025 15:49:20 +0200 Subject: [PATCH 3/8] Enhance JWT validation in SessionService by adding pre-flight checks for algorithm restrictions. Update tests to validate handling of disallowed algorithms, including "none". --- .../implementation/session_service.py | 18 +++++++ tests/unit/test_session_service.py | 52 +++++++++++++------ 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/src/corbado_python_sdk/services/implementation/session_service.py b/src/corbado_python_sdk/services/implementation/session_service.py index 7db7a04..c618177 100644 --- a/src/corbado_python_sdk/services/implementation/session_service.py +++ b/src/corbado_python_sdk/services/implementation/session_service.py @@ -1,5 +1,6 @@ import jwt from jwt import ( + get_unverified_header, # ← added ExpiredSignatureError, ImmatureSignatureError, InvalidAlgorithmError, @@ -80,6 +81,23 @@ def validate_token(self, session_token: StrictStr) -> UserEntity: message=ValidationErrorType.CODE_JWT_EMPTY_SESSION_TOKEN.name, ) + # ---- pre-flight alg rejection ---- + try: + header = get_unverified_header(session_token) + except Exception as err: + raise TokenValidationException( + error_type=ValidationErrorType.CODE_JWT_GENERAL, + message=f"Error parsing JWT header: {session_token}", + original_exception=err, + ) + if header.get("alg") not in ALLOWED_ALGS: + raise TokenValidationException( + error_type=ValidationErrorType.CODE_JWT_INVALID_SIGNATURE, + message="Algorithm not allowed", + original_exception=InvalidAlgorithmError("Algorithm not allowed"), + ) + # ----------------------------------------- + # retrieve signing key try: signing_key: jwt.PyJWK = self._jwk_client.get_signing_key_from_jwt(token=session_token) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index f738127..1764577 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -10,7 +10,6 @@ ImmatureSignatureError, InvalidAlgorithmError, InvalidSignatureError, - PyJWKClientError, encode, ) from pydantic import ValidationError @@ -129,8 +128,8 @@ def _provide_jwts(self): False, """eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6 IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.dyt0CoTl4WoVjAHI9Q_CwSKhl6d_9rhM3NrXuJttkao""", - PyJWKClientError, - 'Unable to find a signing key that matches: "None"', + InvalidAlgorithmError, + "Algorithm not allowed", ), # Not before (nfb) in future ( @@ -180,6 +179,13 @@ def _provide_jwts(self): None, None, ), + # Disallowed algorithm "none" + ( + False, + self._generate_jwt(iss="https://auth.acme.com", exp=int(time()) + 100, nbf=int(time()) - 100, algorithm="none"), + InvalidAlgorithmError, + "Algorithm not allowed", + ), # Success with old Frontend API URL in config (2) ( True, @@ -194,20 +200,17 @@ def _provide_jwts(self): None, None, ), - # Disallowed algorithm "none" - ( - False, - "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0." - "eyJpc3MiOiJodHRwczovL2F1dGguYWNtZS5jb20iLCJzdWIiOiIxMjM0NSIsImlhdCI6" - + str(int(time())) - + "f.", - InvalidAlgorithmError, - "Algorithm not allowed", - ), ] @classmethod - def _generate_jwt(cls, iss: str, exp: int, nbf: int, valid_key: bool = True) -> str: + def _generate_jwt( + cls, + iss: str, + exp: int, + nbf: int, + valid_key: bool = True, + algorithm: str = "RS256", + ) -> str: payload = { "iss": iss, "iat": int(time()), @@ -217,9 +220,24 @@ def _generate_jwt(cls, iss: str, exp: int, nbf: int, valid_key: bool = True) -> "name": TEST_NAME, } - if valid_key: - return encode(payload, key=cls.private_key, algorithm="RS256", headers={"kid": "kid123"}) - return encode(payload, key=cls.invalid_private_key, algorithm="RS256", headers={"kid": "kid123"}) + key_to_use = cls.private_key if valid_key else cls.invalid_private_key + + # unsecured JWT (“none”) + if algorithm.lower() == "none": + # key must be None for alg=none + return encode( + payload, + key=None, + headers={"alg": "none", "typ": "JWT"}, + ) + + # signed JWT (RS256 by default) + return encode( + payload, + key=key_to_use, + algorithm=algorithm, + headers={"kid": "kid123"}, + ) class TestSessionService(TestBase): From 43e4b22ab97aab2a2b0f8181426733a28387a2c2 Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Wed, 11 Jun 2025 16:05:41 +0200 Subject: [PATCH 4/8] Refactor JWT validation in SessionService by removing pre-flight algorithm checks and updating error handling. Adjust tests to reflect changes in algorithm validation, specifically for PyJWKClientError. --- .../services/implementation/session_service.py | 18 ------------------ tests/unit/test_session_service.py | 9 +++++---- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/src/corbado_python_sdk/services/implementation/session_service.py b/src/corbado_python_sdk/services/implementation/session_service.py index c618177..7db7a04 100644 --- a/src/corbado_python_sdk/services/implementation/session_service.py +++ b/src/corbado_python_sdk/services/implementation/session_service.py @@ -1,6 +1,5 @@ import jwt from jwt import ( - get_unverified_header, # ← added ExpiredSignatureError, ImmatureSignatureError, InvalidAlgorithmError, @@ -81,23 +80,6 @@ def validate_token(self, session_token: StrictStr) -> UserEntity: message=ValidationErrorType.CODE_JWT_EMPTY_SESSION_TOKEN.name, ) - # ---- pre-flight alg rejection ---- - try: - header = get_unverified_header(session_token) - except Exception as err: - raise TokenValidationException( - error_type=ValidationErrorType.CODE_JWT_GENERAL, - message=f"Error parsing JWT header: {session_token}", - original_exception=err, - ) - if header.get("alg") not in ALLOWED_ALGS: - raise TokenValidationException( - error_type=ValidationErrorType.CODE_JWT_INVALID_SIGNATURE, - message="Algorithm not allowed", - original_exception=InvalidAlgorithmError("Algorithm not allowed"), - ) - # ----------------------------------------- - # retrieve signing key try: signing_key: jwt.PyJWK = self._jwk_client.get_signing_key_from_jwt(token=session_token) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index 1764577..7a2e1fc 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -10,6 +10,7 @@ ImmatureSignatureError, InvalidAlgorithmError, InvalidSignatureError, + PyJWKClientError, encode, ) from pydantic import ValidationError @@ -128,8 +129,8 @@ def _provide_jwts(self): False, """eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6 IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.dyt0CoTl4WoVjAHI9Q_CwSKhl6d_9rhM3NrXuJttkao""", - InvalidAlgorithmError, - "Algorithm not allowed", + PyJWKClientError, + 'Unable to find a signing key that matches: "None"', ), # Not before (nfb) in future ( @@ -183,8 +184,8 @@ def _provide_jwts(self): ( False, self._generate_jwt(iss="https://auth.acme.com", exp=int(time()) + 100, nbf=int(time()) - 100, algorithm="none"), - InvalidAlgorithmError, - "Algorithm not allowed", + PyJWKClientError, + 'Unable to find a signing key that matches: "None"', ), # Success with old Frontend API URL in config (2) ( From 8ec7d98e5fbb1a5b3b6e75552e93044c5721612f Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Wed, 11 Jun 2025 16:05:45 +0200 Subject: [PATCH 5/8] Remove unused InvalidAlgorithmError import from session service tests to streamline error handling and improve code clarity. --- tests/unit/test_session_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index 7a2e1fc..b10dd6b 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -8,7 +8,6 @@ DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidAlgorithmError, InvalidSignatureError, PyJWKClientError, encode, From 1ca4d28eb940873f3936159f9379305da5bf73b4 Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Thu, 12 Jun 2025 13:19:39 +0200 Subject: [PATCH 6/8] fixes the tests --- tests/unit/test_session_service.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index b10dd6b..c5d1ec3 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -9,6 +9,7 @@ ExpiredSignatureError, ImmatureSignatureError, InvalidSignatureError, + InvalidAlgorithmError, PyJWKClientError, encode, ) @@ -182,9 +183,9 @@ def _provide_jwts(self): # Disallowed algorithm "none" ( False, - self._generate_jwt(iss="https://auth.acme.com", exp=int(time()) + 100, nbf=int(time()) - 100, algorithm="none"), - PyJWKClientError, - 'Unable to find a signing key that matches: "None"', + "eyJhbGciOiAibm9uZSIsICJ0eXAiOiAiSldUIiwgImtpZCI6ICJraWQxMjMifQ.eyJpc3MiOiAiaHR0cHM6Ly9hdXRoLmFjbWUuY29tIiwgInN1YiI6ICIxMjM0NSIsICJpYXQiOiAxNzQ5NzI2NjIxLCAiZXhwIjogMTc0OTczMDIyMSwgIm5iZiI6IDE3NDk3MjY2MjF9.", + InvalidAlgorithmError, + 'The specified alg value is not allowed', ), # Success with old Frontend API URL in config (2) ( @@ -209,7 +210,6 @@ def _generate_jwt( exp: int, nbf: int, valid_key: bool = True, - algorithm: str = "RS256", ) -> str: payload = { "iss": iss, @@ -222,20 +222,11 @@ def _generate_jwt( key_to_use = cls.private_key if valid_key else cls.invalid_private_key - # unsecured JWT (“none”) - if algorithm.lower() == "none": - # key must be None for alg=none - return encode( - payload, - key=None, - headers={"alg": "none", "typ": "JWT"}, - ) - # signed JWT (RS256 by default) return encode( payload, key=key_to_use, - algorithm=algorithm, + algorithm="RS256", headers={"kid": "kid123"}, ) From 4d7b8b3eca48fd52b3346439fabc81118a929420 Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Thu, 12 Jun 2025 13:28:41 +0200 Subject: [PATCH 7/8] Refactor _generate_jwt method in TestBase to streamline key selection logic for JWT encoding, improving readability and maintainability of unit tests. --- tests/unit/test_session_service.py | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index c5d1ec3..baf1b89 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -204,13 +204,7 @@ def _provide_jwts(self): ] @classmethod - def _generate_jwt( - cls, - iss: str, - exp: int, - nbf: int, - valid_key: bool = True, - ) -> str: + def _generate_jwt(cls, iss: str, exp: int, nbf: int, valid_key: bool = True) -> str: payload = { "iss": iss, "iat": int(time()), @@ -220,15 +214,9 @@ def _generate_jwt( "name": TEST_NAME, } - key_to_use = cls.private_key if valid_key else cls.invalid_private_key - - # signed JWT (RS256 by default) - return encode( - payload, - key=key_to_use, - algorithm="RS256", - headers={"kid": "kid123"}, - ) + if valid_key: + return encode(payload, key=cls.private_key, algorithm="RS256", headers={"kid": "kid123"}) + return encode(payload, key=cls.invalid_private_key, algorithm="RS256", headers={"kid": "kid123"}) class TestSessionService(TestBase): From 7a6aed37ae141af390b961ca91075faaabd6935d Mon Sep 17 00:00:00 2001 From: Dopeamin Date: Fri, 13 Jun 2025 10:09:29 +0200 Subject: [PATCH 8/8] fixes linting --- tests/unit/test_session_service.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_session_service.py b/tests/unit/test_session_service.py index baf1b89..6a001d5 100644 --- a/tests/unit/test_session_service.py +++ b/tests/unit/test_session_service.py @@ -8,8 +8,8 @@ DecodeError, ExpiredSignatureError, ImmatureSignatureError, - InvalidSignatureError, InvalidAlgorithmError, + InvalidSignatureError, PyJWKClientError, encode, ) @@ -127,8 +127,10 @@ def _provide_jwts(self): # JWT signed with wrong algorithm (HS256 instead of RS256) ( False, - """eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6 - IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.dyt0CoTl4WoVjAHI9Q_CwSKhl6d_9rhM3NrXuJttkao""", + ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6" + "IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.dyt0CoTl4WoVjAHI9Q_CwSKhl6d_9rhM3NrXuJttkao" + ), PyJWKClientError, 'Unable to find a signing key that matches: "None"', ), @@ -183,7 +185,8 @@ def _provide_jwts(self): # Disallowed algorithm "none" ( False, - "eyJhbGciOiAibm9uZSIsICJ0eXAiOiAiSldUIiwgImtpZCI6ICJraWQxMjMifQ.eyJpc3MiOiAiaHR0cHM6Ly9hdXRoLmFjbWUuY29tIiwgInN1YiI6ICIxMjM0NSIsICJpYXQiOiAxNzQ5NzI2NjIxLCAiZXhwIjogMTc0OTczMDIyMSwgIm5iZiI6IDE3NDk3MjY2MjF9.", + "eyJhbGciOiAibm9uZSIsICJ0eXAiOiAiSldUIiwgImtpZCI6ICJraWQxMjMifQ.eyJpc3MiOiAiaHR0cHM6" + "Ly9hdXRoLmFjbWUuY29tIiwgInN1YiI6ICIxMjM0NSIsICJpYXQiOiAxNzQ5NzI2NjIxLCAiZXhwIjogMTc0OTczMDIyMSwgIm5iZiI6IDE3NDk3MjY2MjF9.", InvalidAlgorithmError, 'The specified alg value is not allowed', ),