From 15c4f4f7d4f98f5dad5452b48a623bda8f32e260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89loi=20Rivard?= Date: Sun, 25 Jan 2026 16:22:59 +0100 Subject: [PATCH] refactor: use scim2-models native exceptions --- scim2_server/backend.py | 14 ++++----- scim2_server/filter.py | 5 ++-- scim2_server/operators.py | 37 +++++++++++++----------- scim2_server/provider.py | 4 +-- scim2_server/utils.py | 29 ++++++++----------- tests/test_operators.py | 60 +++++++++++++++++++++------------------ tests/test_patch.py | 4 +-- tests/test_utils.py | 24 +++++++++------- uv.lock | 2 +- 9 files changed, 91 insertions(+), 88 deletions(-) diff --git a/scim2_server/backend.py b/scim2_server/backend.py index 5b36c26..d6e0b9a 100644 --- a/scim2_server/backend.py +++ b/scim2_server/backend.py @@ -11,7 +11,6 @@ from scim2_models import Attribute from scim2_models import BaseModel from scim2_models import CaseExact -from scim2_models import Error from scim2_models import Extension from scim2_models import Meta from scim2_models import Resource @@ -19,11 +18,11 @@ from scim2_models import Schema from scim2_models import SearchRequest from scim2_models import Uniqueness +from scim2_models import UniquenessException from werkzeug.http import generate_etag from scim2_server.filter import evaluate_filter from scim2_server.operators import ResolveSortOperator -from scim2_server.utils import SCIMException from scim2_server.utils import get_by_alias @@ -120,10 +119,9 @@ def query_resources( Resources. The List must contain a copy of resources. Mutating elements in the List must not modify the data stored in the backend. - :raises SCIMException: If the backend only supports querying for - one resource type at a time, setting resource_type_id to - None the backend may raise a - SCIMException(Error.make_too_many_error()). + :raises TooManyException: If the backend only supports querying + for one resource type at a time, setting resource_type_id to + None the backend may raise TooManyException. """ raise NotImplementedError @@ -353,7 +351,7 @@ def create_resource( if existing_resource.meta.resource_type == resource_type_id: existing_value = unique_attribute.get_attribute(existing_resource) if existing_value == new_value: - raise SCIMException(Error.make_uniqueness_error()) + raise UniquenessException() self.resources.append(resource) return resource @@ -392,7 +390,7 @@ def update_resource( existing_resource ) if existing_value == new_value: - raise SCIMException(Error.make_uniqueness_error()) + raise UniquenessException() self.resources[found_res_idx] = updated_resource return updated_resource diff --git a/scim2_server/filter.py b/scim2_server/filter.py index 635126a..5315dab 100644 --- a/scim2_server/filter.py +++ b/scim2_server/filter.py @@ -3,9 +3,8 @@ from scim2_filter_parser import ast as scim2ast from scim2_models import BaseModel from scim2_models import CaseExact -from scim2_models import Error +from scim2_models import InvalidFilterException -from scim2_server.utils import SCIMException from scim2_server.utils import get_by_alias from scim2_server.utils import parse_new_value @@ -125,4 +124,4 @@ def check_comparable_value(value): status code 400) with "scimType" of "invalidFilter"." """ if isinstance(value, bytes | bool | NoneType): - raise SCIMException(Error.make_invalid_filter_error()) + raise InvalidFilterException() diff --git a/scim2_server/operators.py b/scim2_server/operators.py index 5100bb1..3df8943 100644 --- a/scim2_server/operators.py +++ b/scim2_server/operators.py @@ -5,15 +5,18 @@ from scim2_filter_parser.parser import SCIMParser from scim2_models import BaseModel from scim2_models import CaseExact -from scim2_models import Error +from scim2_models import InvalidPathException +from scim2_models import InvalidValueException from scim2_models import Mutability +from scim2_models import MutabilityException +from scim2_models import NoTargetException from scim2_models import PatchOperation from scim2_models import Required from scim2_models import Resource from scim2_models import Returned +from scim2_models import SensitiveException from scim2_server.filter import evaluate_filter -from scim2_server.utils import SCIMException from scim2_server.utils import get_by_alias from scim2_server.utils import get_or_create from scim2_server.utils import handle_extension @@ -55,7 +58,7 @@ def parse_attribute_path(attribute_path: str | None) -> dict[str, Any] | None: match = ATTRIBUTE_PATH_REGEX.match(attribute_path) if not match: - raise SCIMException(Error.make_invalid_path_error()) + raise InvalidPathException() parse_attribute_path.cache[attribute_path] = match.groupdict() return match.groupdict() @@ -148,7 +151,7 @@ def match_multi_valued_attribute_sub( attribute_name = get_by_alias(type(model), attribute) multi_valued_attribute = get_or_create(model, attribute_name, True) if not isinstance(multi_valued_attribute, list): - raise SCIMException(Error.make_invalid_path_error()) + raise InvalidPathException() token_stream = SCIMLexer().tokenize(condition) condition = SCIMParser().parse(token_stream) self.init_return(model, attribute_name, sub_attribute, self.value) @@ -160,13 +163,13 @@ def match_multi_valued_attribute( self, attribute: str, condition: str, model: BaseModel ): if self.REQUIRES_VALUE and not isinstance(self.value, dict): - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() attribute_name = get_by_alias(type(model), attribute) multi_valued_attribute = get_or_create( model, attribute_name, self.REQUIRES_VALUE ) if not isinstance(multi_valued_attribute, list): - raise SCIMException(Error.make_invalid_path_error()) + raise InvalidPathException() token_stream = SCIMLexer().tokenize(condition) condition = SCIMParser().parse(token_stream) if self.REQUIRES_VALUE: @@ -196,7 +199,7 @@ def match_complex_attribute(self, attribute: str, model: BaseModel, sub_path: st self.match_attribute(sub_path, value) else: if not isinstance(complex_attribute, BaseModel): - raise SCIMException(Error.make_invalid_path_error()) + raise InvalidPathException() self.match_attribute(sub_path, complex_attribute) def match_attribute(self, attribute: str, model: BaseModel): @@ -205,9 +208,9 @@ def match_attribute(self, attribute: str, model: BaseModel): def call_on_root(self, model: Resource): if not self.OPERATE_ON_ROOT: - raise SCIMException(Error.make_no_target_error()) + raise NoTargetException() if not isinstance(self.value, dict): - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() for k, v in self.value.items(): ext, scim_name = handle_extension(model, k) if ext == model: @@ -233,7 +236,7 @@ def operation(cls, model: BaseModel, attribute: str, value: Any): return if model.get_field_annotation(alias, Mutability) == Mutability.read_only: - raise SCIMException(Error.make_mutability_error()) + raise MutabilityException() if model.get_field_multiplicity(alias): if getattr(model, alias) is None: @@ -247,7 +250,7 @@ def operation(cls, model: BaseModel, attribute: str, value: Any): model.get_field_annotation(alias, Required) == Required.true and not new_value ): - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() setattr(model, alias, new_value) @@ -268,10 +271,10 @@ def operation(cls, model: BaseModel, attribute: str, value: Any): Mutability.read_only, Mutability.immutable, ): - raise SCIMException(Error.make_mutability_error()) + raise MutabilityException() if model.get_field_annotation(alias, Required) == Required.true: - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() setattr(model, alias, None) @@ -283,7 +286,7 @@ class ReplaceOperator(Operator): def operation(cls, model: BaseModel, attribute: str, value: Any): alias = get_by_alias(type(model), attribute) if model.get_field_multiplicity(alias) and not isinstance(value, list): - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() existing_value = getattr(model, alias) new_value = parse_new_value(model, alias, value) @@ -291,13 +294,13 @@ def operation(cls, model: BaseModel, attribute: str, value: Any): return if model.get_field_annotation(alias, Mutability) == Mutability.read_only: - raise SCIMException(Error.make_mutability_error()) + raise MutabilityException() if ( model.get_field_annotation(alias, Required) == Required.true and not new_value ): - raise SCIMException(Error.make_invalid_value_error()) + raise InvalidValueException() setattr(model, alias, new_value) @@ -372,7 +375,7 @@ def init_return( model.get_field_annotation(alias, Mutability) == Mutability.write_only or model.get_field_annotation(alias, Returned) == Returned.never ): - raise SCIMException(Error.make_sensitive_error()) + raise SensitiveException() @classmethod def operation( diff --git a/scim2_server/provider.py b/scim2_server/provider.py index 702b121..0e82c9a 100644 --- a/scim2_server/provider.py +++ b/scim2_server/provider.py @@ -20,6 +20,7 @@ from scim2_models import Resource from scim2_models import ResourceType from scim2_models import Schema +from scim2_models import SCIMException from scim2_models import SearchRequest from scim2_models import ServiceProviderConfig from scim2_models import Sort @@ -39,7 +40,6 @@ from scim2_server.backend import Backend from scim2_server.operators import patch_resource -from scim2_server.utils import SCIMException from scim2_server.utils import merge_resources @@ -537,7 +537,7 @@ def wsgi_app(self, request: Request, environ): return self.make_error(Error(status=e.code, detail=e.description)) except SCIMException as e: self.log.exception(e) - return self.make_error(e.scim_error) + return self.make_error(e.to_error()) except ValidationError as e: self.log.exception(e) return self.make_error(Error(status=400, detail=str(e))) diff --git a/scim2_server/utils.py b/scim2_server/utils.py index 4540984..626d897 100644 --- a/scim2_server/utils.py +++ b/scim2_server/utils.py @@ -8,21 +8,16 @@ from pydantic import EmailStr from pydantic import ValidationError from scim2_models import BaseModel -from scim2_models import Error from scim2_models import Extension +from scim2_models import InvalidValueException from scim2_models import Mutability +from scim2_models import MutabilityException +from scim2_models import NoTargetException from scim2_models import Resource from scim2_models import ResourceType from scim2_models import Schema -class SCIMException(Exception): - """A wrapper class, because an "Error" does not inherit from Exception and should not be raised.""" - - def __init__(self, scim_error: Error): - self.scim_error = scim_error - - def load_json_resource(json_name: str) -> list: """Load a JSON document from the scim2_server package resources.""" fp = importlib.resources.files("scim2_server") / "resources" / json_name @@ -69,7 +64,7 @@ def merge_resources(target: Resource, updates: BaseModel): if mutability == Mutability.immutable and getattr( target, set_attribute ) not in (None, new_value): - raise SCIMException(Error.make_mutability_error()) + raise MutabilityException() setattr(target, set_attribute, new_value) @@ -82,8 +77,8 @@ def get_by_alias( :param scim_name: SCIM attribute name :param allow_none: Allow returning None if attribute is not found :return: pydantic attribute name - :raises SCIMException: If no attribute is found and allow_none is - False + :raises NoTargetException: If no attribute is found and allow_none + is False """ try: return next( @@ -94,7 +89,7 @@ def get_by_alias( except StopIteration as e: if allow_none: return None - raise SCIMException(Error.make_no_target_error()) from e + raise NoTargetException() from e def get_or_create( @@ -107,7 +102,7 @@ def get_or_create( :param check_mutability: If True, validate that the attribute is mutable :return: A complex attribute model - :raises SCIMException: If attribute is not mutable and + :raises MutabilityException: If attribute is not mutable and check_mutability is True """ if check_mutability: @@ -115,7 +110,7 @@ def get_or_create( Mutability.read_only, Mutability.immutable, ): - raise SCIMException(Error.make_mutability_error()) + raise MutabilityException() ret = getattr(model, attribute_name, None) if not ret: if model.get_field_multiplicity(attribute_name): @@ -164,8 +159,8 @@ def model_validate_from_dict(field_root_type: type[BaseModel], value: dict) -> A def parse_new_value(model: BaseModel, attribute_name: str, value: Any) -> Any: """Given a model and attribute name, attempt to parse a new value so that the type matches the type expected by the model. - :raises SCIMException: If attribute can not be mapped to the - required type + :raises InvalidValueException: If attribute can not be mapped to + the required type """ field_root_type = model.get_field_root_type(attribute_name) try: @@ -195,5 +190,5 @@ def parse_new_value(model: BaseModel, attribute_name: str, value: Any) -> Any: else: new_value = field_root_type(value) except (AttributeError, TypeError, ValueError, ValidationError) as e: - raise SCIMException(Error.make_invalid_value_error()) from e + raise InvalidValueException() from e return new_value diff --git a/tests/test_operators.py b/tests/test_operators.py index ee3d34f..6a9ae05 100644 --- a/tests/test_operators.py +++ b/tests/test_operators.py @@ -2,12 +2,17 @@ import datetime import pytest +from scim2_models import URN from scim2_models import BaseModel from scim2_models import CaseExact from scim2_models import Email from scim2_models import EnterpriseUser from scim2_models import GroupMembership +from scim2_models import InvalidPathException +from scim2_models import InvalidValueException +from scim2_models import MutabilityException from scim2_models import Name +from scim2_models import NoTargetException from scim2_models import Resource from scim2_models import User from scim2_models import X509Certificate @@ -19,7 +24,6 @@ from scim2_server.operators import ResolveResult from scim2_server.operators import ResolveSortOperator from scim2_server.operators import parse_attribute_path -from scim2_server.utils import SCIMException class TestOperators: @@ -45,11 +49,11 @@ def test_path_resolving(self): "condition": "x eq 5", "sub_attribute": "b", } - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): assert parse_attribute_path("%invalid$$path") - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): assert parse_attribute_path(".a..b[x]") - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): assert parse_attribute_path("\\x") def test_simple_add_operator_unset_single_valued(self): @@ -84,7 +88,7 @@ def test_simple_add_operator_multi_valued_primary(self): def test_simple_add_operator_immutable(self): u = User() - with pytest.raises(SCIMException, match="mutability"): + with pytest.raises(MutabilityException): AddOperator.operation(u, "id", "123") u.id = "123" AddOperator.operation(u, "id", "123") @@ -92,11 +96,11 @@ def test_simple_add_operator_immutable(self): def test_simple_add_operator_invalid_value(self): u = User() - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator.operation(u, "userName", {}) - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator.operation(u, "emails", "abc") - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator.operation(u, "emails", {"foo": 123}) def test_simple_add_operator_bool_value_parsing(self): @@ -136,7 +140,7 @@ def test_add_operator_fully_qualified_attribute_name(self): def test_add_operator_overwrite_required_attribute(self): u = User(user_name="A") - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator("userName", "")(u) def test_add_operator_multi_valued_list(self): @@ -157,11 +161,11 @@ def test_add_operator_multi_valued_list(self): def test_add_operator_root_object_invalid_value(self): u = User() - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator(None, "abc")(u) - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator(None, 1)(u) - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator(None, [1, 2, 3])(u) def test_add_operator_simple_attribute(self): @@ -179,23 +183,23 @@ def test_add_operator_complex_attribute(self): def test_add_operator_complex_attribute_invalid_path(self): u = User() - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator('name[givenName eq "Foo"]', "foo")(u) - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): AddOperator('name[givenName eq "Foo"].formatted', "foo")(u) - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): AddOperator('name[givenName eq "Foo"]', {"formatted": "foo"})(u) def test_add_operator_multi_valued_attribute_invalid_path(self): u = User() - with pytest.raises(SCIMException, match="invalidPath"): + with pytest.raises(InvalidPathException): AddOperator("emails.value", "work@example.com")(u) def test_add_operator_multi_valued_attribute_mutability(self): u = User() - with pytest.raises(SCIMException, match="immutable"): + with pytest.raises(MutabilityException): AddOperator( "groups", { @@ -264,7 +268,7 @@ def test_add_operator_multi_valued_complex_attribute_value_only(self): assert u.emails == [ Email(value="work@example.com"), ] - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): AddOperator("name", "Mr. Foo")(u) def test_add_operator_binary_data(self): @@ -278,7 +282,7 @@ def test_add_operator_binary_data(self): def test_add_operator_datetime(self): class Foo(Resource): - schemas: list[str] = ["urn:example:2.0:Foo"] + __schema__ = URN("urn:example:2.0:Foo") dt: datetime.datetime | None = None f = Foo() @@ -401,7 +405,7 @@ def test_simple_replace_operator_multi_valued(self): def test_simple_replace_operator_multi_valued_invalid_value(self): u = User(emails=[]) - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): ReplaceOperator.operation( u, "emails", @@ -410,7 +414,7 @@ def test_simple_replace_operator_multi_valued_invalid_value(self): def test_simple_replace_operator_immutable(self): u = User() - with pytest.raises(SCIMException, match="mutability"): + with pytest.raises(MutabilityException): ReplaceOperator.operation(u, "id", "123") u.id = "123" ReplaceOperator.operation(u, "id", "123") @@ -441,7 +445,7 @@ def test_replace_operator_extension_root_object(self): def test_replace_operator_overwrite_required_attribute(self): u = User(user_name="A") - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): ReplaceOperator("userName", "")(u) def test_simple_remove_operator_unset_single_valued(self): @@ -463,19 +467,19 @@ def test_simple_remove_operator_multi_valued(self): def test_simple_remove_operator_immutable(self): u = User(id="123", groups=[GroupMembership()]) - with pytest.raises(SCIMException, match="immutable"): + with pytest.raises(MutabilityException): RemoveOperator.operation(u, "id", None) - with pytest.raises(SCIMException, match="immutable"): + with pytest.raises(MutabilityException): RemoveOperator.operation(u, "groups", None) def test_remove_operator_root_object(self): u = User() - with pytest.raises(SCIMException, match="noTarget"): + with pytest.raises(NoTargetException): RemoveOperator("", None)(u) def test_remove_operator_required_attribute(self): u = User(user_name="A") - with pytest.raises(SCIMException, match="invalidValue"): + with pytest.raises(InvalidValueException): RemoveOperator("userName", None)(u) def test_remove_operator_complex_attribute(self): @@ -516,7 +520,7 @@ def test_remove_operator_multi_valued_filter_sub_attribute(self): def test_remove_operator_extension_root(self): u = User[EnterpriseUser]() u.EnterpriseUser = EnterpriseUser(employee_number="123") - with pytest.raises(SCIMException, match="noTarget"): + with pytest.raises(NoTargetException): RemoveOperator( "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", None )(u) @@ -542,7 +546,7 @@ def test_resolve_operator_unset_simple_attribute(self): u = User() assert self._resolve_value("userName", u) is None assert self._resolve_value("active", u) is None - with pytest.raises(SCIMException, match="noTarget"): + with pytest.raises(NoTargetException): self._resolve_value("invalidAttribute", u) def test_resolve_operator_simple_attribute(self): diff --git a/tests/test_patch.py b/tests/test_patch.py index e46a34a..afd551d 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -1,8 +1,8 @@ import pytest +from scim2_models import MutabilityException from scim2_models import PatchOperation from scim2_server.operators import patch_resource -from scim2_server.utils import SCIMException class TestPatch: @@ -42,7 +42,7 @@ def test_patch_operation_add_simple(self, provider): "id": "123", "userName": "Bar", } - with pytest.raises(SCIMException, match="mutability"): + with pytest.raises(MutabilityException): patch_resource( user, PatchOperation( diff --git a/tests/test_utils.py b/tests/test_utils.py index 4b56fd1..3929e8f 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,17 +3,21 @@ import pytest from scim2_filter_parser.lexer import SCIMLexer from scim2_filter_parser.parser import SCIMParser +from scim2_models import URN from scim2_models import Context from scim2_models import EnterpriseUser +from scim2_models import InvalidFilterException from scim2_models import Meta from scim2_models import Mutability +from scim2_models import MutabilityException from scim2_models import Name +from scim2_models import NoTargetException from scim2_models import Resource +from scim2_models import SensitiveException from scim2_models import User from scim2_server.filter import evaluate_filter from scim2_server.operators import ResolveOperator -from scim2_server.utils import SCIMException from scim2_server.utils import get_or_create from scim2_server.utils import merge_resources @@ -110,7 +114,7 @@ def evaluate(filter_str: str) -> bool: assert evaluate('meta.created eq "2024-08-01T12:40:00Z"') assert evaluate('meta.created lt "2025-01-01T00:00:00Z"') - with pytest.raises(SCIMException, match="sensitive"): + with pytest.raises(SensitiveException): # Password is sensitive (never returned), filter must not match # to not reveal any information about the value assert not evaluate('password sw "(Rtk_"') @@ -122,19 +126,19 @@ def evaluate(filter_str: str) -> bool: assert evaluate('emails[value pr].type co "home"') assert not evaluate('emails[value pr].value co "x@example.com"') - with pytest.raises(SCIMException, match="noTarget"): + with pytest.raises(NoTargetException): assert not evaluate('name[type eq "foo"]') - with pytest.raises(SCIMException, match="invalidFilter"): + with pytest.raises(InvalidFilterException): assert not evaluate("active gt 5") - with pytest.raises(SCIMException, match="invalidFilter"): + with pytest.raises(InvalidFilterException): assert not evaluate("active lt 5") - with pytest.raises(SCIMException, match="invalidFilter"): + with pytest.raises(InvalidFilterException): assert not evaluate("active ge 5") - with pytest.raises(SCIMException, match="invalidFilter"): + with pytest.raises(InvalidFilterException): assert not evaluate("active le 5") # Filters based on examples from RFC 7644, Section 3.4.2.2 @@ -354,19 +358,19 @@ def test_dump_extension(self, provider): def test_merge_resources_immutable(self): class Foo(Resource): - schemas: list[str] = ["urn:example:2.0:Foo"] + __schema__ = URN("urn:example:2.0:Foo") immutable_string: Annotated[str | None, Mutability.immutable] = None stored = Foo() merge_resources(stored, Foo(immutable_string="ABC")) assert stored.immutable_string == "ABC" merge_resources(stored, Foo(immutable_string="ABC")) - with pytest.raises(SCIMException, match="mutability"): + with pytest.raises(MutabilityException): merge_resources(stored, Foo(immutable_string="D")) def test_get_or_create_mutability(self): u = User() - with pytest.raises(SCIMException, match="immutable"): + with pytest.raises(MutabilityException): get_or_create(u, "groups", True) def test_merge_resources_none_extension(self): diff --git a/uv.lock b/uv.lock index 72d45f9..d42d444 100644 --- a/uv.lock +++ b/uv.lock @@ -650,7 +650,7 @@ wheels = [ [[package]] name = "scim2-server" -version = "0.1.7" +version = "0.1.8" source = { editable = "." } dependencies = [ { name = "scim2-filter-parser" },