diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e17ef2a..07a04204 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,34 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.75 (2026-06-07) + +### Changed (data — unified `@transactional` for every backend) + +There is now **one** `@transactional` annotation for both relational and document services, +imported from `pyfly.data` (Spring's uniform-annotation model). It dispatches at call time to the +transaction manager the service exposes: + +- relational `async_sessionmaker` on `self._session_factory` → SQLAlchemy transaction (propagation, + isolation, read-only, `rollback_for`/`no_rollback_for`, repository session patching); +- MongoDB client on `self._motor_client` → Mongo session + transaction (session injected as the + `session` kwarg, commit/abort with `rollback_for`/`no_rollback_for` parity). + +The decorator is backend-neutral (`pyfly.data.transactional`) and imports neither SQLAlchemy nor +Motor at module scope; each backend supplies a lazily-imported runner (`run_relational_transaction` +/ `run_mongo_transaction`). `from pyfly.data.relational.sqlalchemy import transactional` still works +(same object); **`mongo_transactional` is now a deprecated alias** of `@transactional`. + +### Fixed / tested + +- The MongoDB transaction path — previously **completely untested** — now has behavior tests + (commit, abort on `rollback_for`, commit-and-re-raise on `no_rollback_for`, session injection, + missing-client error) and gained `rollback_for`/`no_rollback_for` parity with relational. +- Relational isolation is now verified (a test asserts the `isolation_level` execution option + actually reaches the session — previously only the enum/metadata were checked). +- Docs (`data.md`, `data-document.md`) + the companion `implement-data-repository` skill updated to + teach the single annotation; the doc's broken `@mongo_transactional(client)` example is corrected. + ## v26.06.74 (2026-06-07) ### Docs (data — query-mechanism decision guide + Spring-Data parity matrix) diff --git a/README.md b/README.md index 58d2dc5b..90a548b2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.74 + Version: 26.06.75 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/docs/modules/data-document.md b/docs/modules/data-document.md index 418ce46f..ba3ceebd 100644 --- a/docs/modules/data-document.md +++ b/docs/modules/data-document.md @@ -797,44 +797,48 @@ Source files: ## Transaction Management -### mongo_transactional Decorator +### The unified `@transactional` decorator -The `@mongo_transactional` decorator provides declarative async transaction management for MongoDB, mirroring the `@reactive_transactional` decorator from the SQLAlchemy adapter: +MongoDB uses the **same** `@transactional` annotation as the relational adapter — import it from +`pyfly.data`. On a service exposing a Motor client as `self._motor_client`, it opens a Mongo +session + transaction, injects the session as the `session` keyword argument, commits on success, +and aborts on error (honouring `rollback_for` / `no_rollback_for`, like the relational backend): ```python -from pyfly.data.document.mongodb import mongo_transactional -from motor.motor_asyncio import AsyncIOMotorClient - -client: AsyncIOMotorClient = ... - - -@mongo_transactional(client) -async def transfer_funds(from_id: str, to_id: str, amount: float) -> None: - from_account = await AccountDocument.get(from_id) - to_account = await AccountDocument.get(to_id) +from pyfly.container import service +from pyfly.data import transactional - from_account.balance -= amount - to_account.balance += amount - await from_account.save() - await to_account.save() - # Transaction is committed automatically on success - # Transaction is aborted automatically on exception +@service +class AccountService: + def __init__(self, motor_client) -> None: + self._motor_client = motor_client # selects the MongoDB transaction manager + + @transactional() + async def transfer_funds(self, from_id: str, to_id: str, amount: float, *, session=None) -> None: + from_account = await AccountDocument.get(from_id) + to_account = await AccountDocument.get(to_id) + from_account.balance -= amount + to_account.balance += amount + await from_account.save() + await to_account.save() + # Committed automatically on success; aborted automatically on exception. ``` **How it works:** -1. Starts a new Motor session via `client.start_session()`. -2. Begins a transaction on the session via `session.start_transaction()`. -3. Calls the wrapped function with all original arguments. -4. On success: the transaction is committed (via the `async with` context manager). -5. On exception: the transaction is aborted and the exception is re-raised. +1. Resolves the Motor client from `self._motor_client`. +2. Opens a session (`client.start_session()`) and a transaction (`session.start_transaction()`). +3. Injects the live `ClientSession` as the `session` keyword argument and calls the function. +4. On success the transaction commits; on an exception in `rollback_for` it aborts; an exception + in `no_rollback_for` commits and is then re-raised (mirroring the relational semantics). -Unlike `@reactive_transactional` (which injects the session as the first argument), `@mongo_transactional` does not inject the session. The Beanie document operations automatically participate in the active transaction through Motor's session context. +> **Deprecated:** `from pyfly.data.document.mongodb import mongo_transactional` still works but is +> a thin alias of `@transactional`. Prefer `@transactional`. ### Replica Set Requirement -MongoDB transactions require a replica set deployment. Standalone MongoDB instances do not support multi-document transactions. If you attempt to use `@mongo_transactional` against a standalone instance, MongoDB will raise an error. +MongoDB transactions require a replica set deployment. Standalone MongoDB instances do not support multi-document transactions. If you attempt to use `@transactional` (or the deprecated `@mongo_transactional`) against a standalone instance, MongoDB will raise an error. For local development, you can run a single-node replica set: diff --git a/docs/modules/data.md b/docs/modules/data.md index 54bed293..727de8f2 100644 --- a/docs/modules/data.md +++ b/docs/modules/data.md @@ -1076,7 +1076,7 @@ Some capabilities are **backend-specific** today: | Soft delete (`SoftDeleteRepository`) | ✅ | ❌ not yet | | Optimistic locking (`VersionedMixin` / `@Version`) | ✅ | ❌ not yet | | Auditing auto-population | ✅ `created/updated_at` **and** `created/updated_by` | ⚠️ timestamps at insert only | -| `@transactional` (propagation, isolation, read-only, `rollback_for`) | ✅ | ⚠️ basic commit/abort (`@mongo_transactional`, replica set required) | +| `@transactional` — one annotation, both backends (`pyfly.data`) | ✅ propagation / isolation / read-only / `rollback_for` | ✅ commit/abort + `rollback_for` (replica set; propagation/isolation are relational-only) | **Not yet implemented on either backend** (so you don't reach for them): `@Modifying`-style declarative bulk `UPDATE`/`DELETE`; `Slice` (count-less paging) and streaming/reactive result diff --git a/pyproject.toml b/pyproject.toml index bcca7087..7a5b5048 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.74" +version = "26.6.75" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index 2972c1bf..1c4ac46e 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.74" +__version__ = "26.06.75" diff --git a/src/pyfly/data/__init__.py b/src/pyfly/data/__init__.py index f3670224..463d616c 100644 --- a/src/pyfly/data/__init__.py +++ b/src/pyfly/data/__init__.py @@ -36,27 +36,31 @@ from pyfly.data.query import query from pyfly.data.query_parser import QueryMethodParser from pyfly.data.specification import Specification +from pyfly.data.transactional import Isolation, Propagation, transactional __all__ = [ "BaseFilterUtils", "BaseRepositoryPostProcessor", "CrudRepository", "DERIVED_PREFIXES", + "Isolation", "Mapper", - "default_mapper", - "mapping", "Order", "Page", "Pageable", "PagingRepository", + "Propagation", "QueryMethodCompilerPort", "QueryMethodParser", "RepositoryPort", "SessionPort", "Sort", "Specification", + "default_mapper", "is_projection", + "mapping", "projection", "projection_fields", "query", + "transactional", ] diff --git a/src/pyfly/data/document/mongodb/transactional.py b/src/pyfly/data/document/mongodb/transactional.py index 4cbf4a03..ea5d6d34 100644 --- a/src/pyfly/data/document/mongodb/transactional.py +++ b/src/pyfly/data/document/mongodb/transactional.py @@ -11,51 +11,69 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""MongoDB transactional decorator — wraps async functions in a Mongo session + transaction.""" +"""MongoDB execution for the unified ``@transactional`` decorator. + +Use the backend-neutral ``@transactional`` from :mod:`pyfly.data` on document services too — it +dispatches here when the service exposes a ``_motor_client``. ``mongo_transactional`` is kept as a +**deprecated** alias of ``@transactional`` for backward compatibility. +""" from __future__ import annotations -import functools from collections.abc import Callable -from typing import Any, TypeVar - -F = TypeVar("F", bound=Callable[..., Any]) - +from typing import Any -def mongo_transactional(func: F) -> F: - """Wrap an async function in a MongoDB session and transaction. +from pyfly.data.transactional import transactional - The decorated function receives an extra ``session`` keyword argument - bound to the active ``ClientSession``. If the function completes - without error the transaction is committed; otherwise it is aborted. - Requires a :class:`motor.motor_asyncio.AsyncIOMotorClient` to be - resolvable from the first positional argument's ``_motor_client`` - attribute (typically ``self`` in a service bean). +async def run_mongo_transaction( + func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], + *, + rollback_for: tuple[type[BaseException], ...] = (Exception,), + no_rollback_for: tuple[type[BaseException], ...] = (), +) -> Any: + """Execute *func* inside a MongoDB transaction (the document arm of ``@transactional``). - Example:: + Resolves the Motor client from ``self._motor_client``, opens a session + transaction, injects + the session as the ``session`` keyword argument, and commits on success. On error it aborts — + except for exception types in ``no_rollback_for`` (or any ``Exception`` not in ``rollback_for``), + which commit and then re-raise, mirroring the relational ``@transactional`` semantics. A + ``BaseException`` that is not an ``Exception`` (cancellation/shutdown) always aborts. + """ + self_arg = args[0] if args else None + motor_client = getattr(self_arg, "_motor_client", None) + if motor_client is None: + raise RuntimeError( + f"{func.__qualname__}: cannot resolve Motor client. Ensure the service has a '_motor_client' attribute." + ) - class OrderService: - def __init__(self, motor_client): - self._motor_client = motor_client + async with await motor_client.start_session() as session: + kwargs["session"] = session + # session.start_transaction() commits on a clean context exit and aborts when an exception + # escapes it. To honour no_rollback_for we let such exceptions exit cleanly (commit) and + # re-raise them afterwards, rather than driving abort/commit manually (which would depend + # on Motor's lazy-vs-eager start_transaction semantics). + deferred: BaseException | None = None + result: Any = None + async with session.start_transaction(): + try: + result = await func(*args, **kwargs) + except BaseException as exc: + if not isinstance(exc, Exception): + raise # cancellation/shutdown -> abort + if isinstance(exc, tuple(no_rollback_for)): + deferred = exc # commit, then surface + elif isinstance(exc, tuple(rollback_for)): + raise # -> abort + else: + deferred = exc # not in rollback_for -> commit, then surface + if deferred is not None: + raise deferred + return result - @mongo_transactional - async def place_order(self, order, *, session=None): - ... - """ - @functools.wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - # Resolve the Motor client from the first arg (self) - self_arg = args[0] if args else None - motor_client = getattr(self_arg, "_motor_client", None) - if motor_client is None: - raise RuntimeError( - f"{func.__qualname__}: cannot resolve Motor client. Ensure the service has a '_motor_client' attribute." - ) - - async with await motor_client.start_session() as session, session.start_transaction(): - kwargs["session"] = session - return await func(*args, **kwargs) - - return wrapper # type: ignore[return-value] +# Backward-compatibility alias. Prefer the unified `@transactional` from `pyfly.data`. +mongo_transactional = transactional +"""Deprecated alias of :func:`pyfly.data.transactional.transactional`. Use ``@transactional``.""" diff --git a/src/pyfly/data/relational/sqlalchemy/transactional.py b/src/pyfly/data/relational/sqlalchemy/transactional.py index 6a6b1bfb..b9da7a4e 100644 --- a/src/pyfly/data/relational/sqlalchemy/transactional.py +++ b/src/pyfly/data/relational/sqlalchemy/transactional.py @@ -11,12 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Declarative transaction management decorator.""" +"""SQLAlchemy (relational) execution for the unified ``@transactional`` decorator. + +The public ``@transactional`` annotation is the backend-neutral one in +:mod:`pyfly.data.transactional` (re-exported here for backward compatibility). This module +provides the relational *runner* it dispatches to (``run_relational_transaction``) plus +``reactive_transactional`` for explicit-session use. ``Propagation`` / ``Isolation`` are +re-exported from the core module. +""" from __future__ import annotations import contextlib -import enum import functools from collections.abc import Callable from contextvars import ContextVar @@ -25,6 +31,15 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from pyfly.data.relational.routing import read_only as _read_only_scope +from pyfly.data.transactional import Isolation, Propagation, transactional + +__all__ = [ + "Isolation", + "Propagation", + "reactive_transactional", + "run_relational_transaction", + "transactional", +] F = TypeVar("F", bound=Callable[..., Any]) @@ -34,27 +49,6 @@ ) -class Propagation(enum.Enum): - """Transaction propagation behaviour.""" - - REQUIRED = "REQUIRED" - REQUIRES_NEW = "REQUIRES_NEW" - SUPPORTS = "SUPPORTS" - NOT_SUPPORTED = "NOT_SUPPORTED" - NEVER = "NEVER" - MANDATORY = "MANDATORY" - - -class Isolation(enum.Enum): - """Transaction isolation level.""" - - DEFAULT = "DEFAULT" - READ_UNCOMMITTED = "READ UNCOMMITTED" - READ_COMMITTED = "READ COMMITTED" - REPEATABLE_READ = "REPEATABLE READ" - SERIALIZABLE = "SERIALIZABLE" - - def _patch_repositories(self_arg: Any, session: AsyncSession) -> None: from pyfly.data.relational.sqlalchemy.repository import Repository @@ -73,112 +67,102 @@ def _resolve_session_factory(self_arg: Any) -> async_sessionmaker[AsyncSession] return factory -def transactional( +async def run_relational_transaction( + func: Callable[..., Any], + args: tuple[Any, ...], + kwargs: dict[str, Any], + *, propagation: Propagation = Propagation.REQUIRED, isolation: Isolation = Isolation.DEFAULT, read_only: bool = False, rollback_for: tuple[type[BaseException], ...] = (Exception,), no_rollback_for: tuple[type[BaseException], ...] = (), -) -> Callable[[F], F]: - """DI-aware declarative transaction management decorator. +) -> Any: + """Execute *func* inside a SQLAlchemy transaction (the relational arm of ``@transactional``). - Resolves ``async_sessionmaker`` from ``self._session_factory`` and uses - a ``ContextVar`` for propagation semantics modelled after Spring's - ``@Transactional``. + Resolves ``async_sessionmaker`` from ``self._session_factory`` and uses a ``ContextVar`` for + propagation semantics modelled after Spring's ``@Transactional``. """ - - def decorator(func: F) -> F: - @functools.wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - self_arg = args[0] if args else None - existing: AsyncSession | None = _active_session_var.get() - - if propagation is Propagation.NEVER: - if existing is not None: - raise RuntimeError("Propagation.NEVER — active transaction exists") - return await func(*args, **kwargs) - - if propagation is Propagation.NOT_SUPPORTED: - token = _active_session_var.set(None) - try: - return await func(*args, **kwargs) - finally: - _active_session_var.reset(token) - - if propagation is Propagation.SUPPORTS: - return await func(*args, **kwargs) - - if propagation is Propagation.MANDATORY: - if existing is None: - raise RuntimeError("Propagation.MANDATORY — no active transaction") - return await func(*args, **kwargs) - - if propagation is Propagation.REQUIRED and existing is not None: - return await func(*args, **kwargs) - - session_factory = _resolve_session_factory(self_arg) if self_arg is not None else None - if session_factory is None: - raise RuntimeError( - "No _session_factory available on self — ensure the service has an injected async_sessionmaker" - ) - - execution_options: dict[str, Any] = {} - if isolation is not Isolation.DEFAULT: - execution_options["isolation_level"] = isolation.value - - # read_only routes to the replica when the session factory is a - # RoutingSessionFactory (pyfly.data.relational.routing) and flags the session. - ro_scope = _read_only_scope() if read_only else contextlib.nullcontext() - with ro_scope: - async with session_factory() as session: - if read_only: - session.info["read_only"] = True - if execution_options: - session = session.execution_options(**execution_options) # type: ignore[attr-defined] - - await session.begin() - token = _active_session_var.set(session) - if self_arg is not None: - _patch_repositories(self_arg, session) - try: - result = await func(*args, **kwargs) - await session.commit() - return result - except BaseException as exc: - # Never commit partial work on cancellation/shutdown: a BaseException - # that is not an Exception (asyncio.CancelledError, KeyboardInterrupt, - # SystemExit) always rolls back, regardless of rollback_for. - if not isinstance(exc, Exception): - await session.rollback() - elif isinstance(exc, tuple(no_rollback_for)): - await session.commit() - elif isinstance(exc, tuple(rollback_for)): - await session.rollback() - else: - await session.commit() - raise - finally: - _active_session_var.reset(token) - - wrapper.__pyfly_transactional__ = True # type: ignore[attr-defined] - wrapper.__pyfly_propagation__ = propagation # type: ignore[attr-defined] - wrapper.__pyfly_isolation__ = isolation # type: ignore[attr-defined] - - return wrapper # type: ignore[return-value] - - return decorator + self_arg = args[0] if args else None + existing: AsyncSession | None = _active_session_var.get() + + if propagation is Propagation.NEVER: + if existing is not None: + raise RuntimeError("Propagation.NEVER — active transaction exists") + return await func(*args, **kwargs) + + if propagation is Propagation.NOT_SUPPORTED: + token = _active_session_var.set(None) + try: + return await func(*args, **kwargs) + finally: + _active_session_var.reset(token) + + if propagation is Propagation.SUPPORTS: + return await func(*args, **kwargs) + + if propagation is Propagation.MANDATORY: + if existing is None: + raise RuntimeError("Propagation.MANDATORY — no active transaction") + return await func(*args, **kwargs) + + if propagation is Propagation.REQUIRED and existing is not None: + return await func(*args, **kwargs) + + session_factory = _resolve_session_factory(self_arg) if self_arg is not None else None + if session_factory is None: + raise RuntimeError( + "No _session_factory available on self — ensure the service has an injected async_sessionmaker" + ) + + execution_options: dict[str, Any] = {} + if isolation is not Isolation.DEFAULT: + execution_options["isolation_level"] = isolation.value + + # read_only routes to the replica when the session factory is a RoutingSessionFactory + # (pyfly.data.relational.routing) and flags the session. + ro_scope = _read_only_scope() if read_only else contextlib.nullcontext() + with ro_scope: + async with session_factory() as session: + if read_only: + session.info["read_only"] = True + if execution_options: + session = session.execution_options(**execution_options) # type: ignore[attr-defined] + await session.begin() + token = _active_session_var.set(session) + if self_arg is not None: + _patch_repositories(self_arg, session) + try: + result = await func(*args, **kwargs) + await session.commit() + return result + except BaseException as exc: + # A BaseException that is not an Exception (CancelledError, KeyboardInterrupt, + # SystemExit) always rolls back, regardless of rollback_for. + if not isinstance(exc, Exception): + await session.rollback() + elif isinstance(exc, tuple(no_rollback_for)): + await session.commit() + elif isinstance(exc, tuple(rollback_for)): + await session.rollback() + else: + await session.commit() + raise + finally: + _active_session_var.reset(token) def reactive_transactional( session_factory: async_sessionmaker[AsyncSession], ) -> Callable[[F], F]: - """Decorator for declarative async transaction management. + """Decorator for declarative async transaction management with an explicit session factory. + + Wraps an async function in a database transaction. The decorated function receives an + ``AsyncSession`` as its first argument. On success the transaction is committed; on exception + it is rolled back and the exception re-raised. - Wraps an async function in a database transaction. The decorated function - receives an AsyncSession as its first argument. On success the transaction - is committed; on exception it is rolled back and the exception re-raised. + Usage:: - Usage: @reactive_transactional(session_factory) async def create_user(session: AsyncSession) -> User: user = User(name="Alice") diff --git a/src/pyfly/data/transactional.py b/src/pyfly/data/transactional.py new file mode 100644 index 00000000..67274d01 --- /dev/null +++ b/src/pyfly/data/transactional.py @@ -0,0 +1,123 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unified declarative transaction management — one ``@transactional`` for every backend. + +Like Spring's ``@Transactional``, the annotation is **uniform**; the transaction *manager* is +backend-specific and selected at call time from what the service exposes: + +- a relational ``async_sessionmaker`` on ``self._session_factory`` → SQLAlchemy transaction + (full propagation / isolation / read-only / rollback semantics), or +- a MongoDB client on ``self._motor_client`` → Mongo `ClientSession` transaction. + +The backend adapters provide the execution (``run_relational_transaction`` / +``run_mongo_transaction``); they are imported lazily so this core module pulls in neither +SQLAlchemy nor Motor. +""" + +from __future__ import annotations + +import enum +import functools +from collections.abc import Callable +from typing import Any, TypeVar + +F = TypeVar("F", bound=Callable[..., Any]) + + +class Propagation(enum.Enum): + """Transaction propagation behaviour (relational backend).""" + + REQUIRED = "REQUIRED" + REQUIRES_NEW = "REQUIRES_NEW" + SUPPORTS = "SUPPORTS" + NOT_SUPPORTED = "NOT_SUPPORTED" + NEVER = "NEVER" + MANDATORY = "MANDATORY" + + +class Isolation(enum.Enum): + """Transaction isolation level (relational backend).""" + + DEFAULT = "DEFAULT" + READ_UNCOMMITTED = "READ UNCOMMITTED" + READ_COMMITTED = "READ COMMITTED" + REPEATABLE_READ = "REPEATABLE READ" + SERIALIZABLE = "SERIALIZABLE" + + +def transactional( + func: F | None = None, + /, + *, + propagation: Propagation = Propagation.REQUIRED, + isolation: Isolation = Isolation.DEFAULT, + read_only: bool = False, + rollback_for: tuple[type[BaseException], ...] = (Exception,), + no_rollback_for: tuple[type[BaseException], ...] = (), +) -> Any: + """Declarative transaction management — the single annotation for **all** backends. + + Works bare (``@transactional``) or parametrized (``@transactional(propagation=...)``). At + call time it dispatches to the transaction manager the service exposes: + + - **Relational** (``self._session_factory`` is an ``async_sessionmaker``): opens a session, + applies ``propagation`` / ``isolation`` / ``read_only``, commits on success and rolls back + per ``rollback_for`` / ``no_rollback_for``, and patches the service's repositories onto the + active session. + - **Document** (``self._motor_client`` is an ``AsyncIOMotorClient``): opens a Mongo session + + transaction, injects it as the ``session`` kwarg, commits on success and aborts per + ``rollback_for`` / ``no_rollback_for``. (``propagation`` / ``isolation`` / ``read_only`` are + relational concepts and are ignored on the document backend.) + + Raises ``RuntimeError`` if the service exposes neither manager. + """ + + def decorator(fn: F) -> F: + @functools.wraps(fn) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + self_arg = args[0] if args else None + + if getattr(self_arg, "_session_factory", None) is not None: + from pyfly.data.relational.sqlalchemy.transactional import run_relational_transaction + + return await run_relational_transaction( + fn, + args, + kwargs, + propagation=propagation, + isolation=isolation, + read_only=read_only, + rollback_for=rollback_for, + no_rollback_for=no_rollback_for, + ) + + if getattr(self_arg, "_motor_client", None) is not None: + from pyfly.data.document.mongodb.transactional import run_mongo_transaction + + return await run_mongo_transaction( + fn, args, kwargs, rollback_for=rollback_for, no_rollback_for=no_rollback_for + ) + + raise RuntimeError( + f"{fn.__qualname__}: @transactional found no transaction manager on the service. " + "Expose a relational '_session_factory' (async_sessionmaker) or a document " + "'_motor_client' (AsyncIOMotorClient)." + ) + + wrapper.__pyfly_transactional__ = True # type: ignore[attr-defined] + wrapper.__pyfly_propagation__ = propagation # type: ignore[attr-defined] + wrapper.__pyfly_isolation__ = isolation # type: ignore[attr-defined] + return wrapper # type: ignore[return-value] + + return decorator(func) if func is not None else decorator diff --git a/tests/data/test_transactional.py b/tests/data/test_transactional.py index 258d8a2e..5da6010b 100644 --- a/tests/data/test_transactional.py +++ b/tests/data/test_transactional.py @@ -385,8 +385,10 @@ class Holder: class TestErrorCases: @pytest.mark.asyncio async def test_no_session_factory_raises(self) -> None: + # A service exposing neither a relational _session_factory nor a document _motor_client + # gets the unified "no transaction manager" error (it names both expected attributes). svc = _NoFactoryService() - with pytest.raises(RuntimeError, match="No _session_factory"): + with pytest.raises(RuntimeError, match="no transaction manager"): await svc.do_work() @pytest.mark.asyncio diff --git a/tests/data/test_unified_transactional.py b/tests/data/test_unified_transactional.py new file mode 100644 index 00000000..8e26aa53 --- /dev/null +++ b/tests/data/test_unified_transactional.py @@ -0,0 +1,193 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unified @transactional (v26.06.75) — ONE annotation dispatching to relational or document. + +Proves a single `@transactional` works on both backends and closes the previously-untested +MongoDB transaction path (commit / abort / no_rollback_for-commit / session injection / errors). +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from pyfly.data import Isolation, transactional +from pyfly.data.document.mongodb import mongo_transactional + + +def test_one_annotation_is_shared_across_backends() -> None: + from pyfly.data.relational.sqlalchemy import transactional as relational_transactional + + assert transactional is relational_transactional + assert mongo_transactional is transactional # deprecated alias + + +# --------------------------------------------------------------------------- relational dispatch +def _make_session() -> MagicMock: + session = MagicMock() + session.begin = AsyncMock() + session.commit = AsyncMock() + session.rollback = AsyncMock() + session.info = {} + session.execution_options = MagicMock(return_value=session) + session.__aenter__ = AsyncMock(return_value=session) + session.__aexit__ = AsyncMock(return_value=False) + return session + + +def _relational_service(session: MagicMock) -> Any: + factory = MagicMock(return_value=session) + + class Svc: + def __init__(self) -> None: + self._session_factory = factory + + @transactional() + async def commit_path(self) -> str: + return "ok" + + @transactional(isolation=Isolation.SERIALIZABLE) + async def with_isolation(self) -> str: + return "ok" + + @transactional() + async def failing(self) -> str: + raise ValueError("boom") + + return Svc() + + +@pytest.mark.asyncio +async def test_relational_commits_on_success() -> None: + session = _make_session() + await _relational_service(session).commit_path() + session.commit.assert_awaited_once() + session.rollback.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_relational_isolation_execution_option_is_applied() -> None: + session = _make_session() + await _relational_service(session).with_isolation() + # the audit flagged this as unverified: assert the isolation_level actually reaches the session + session.execution_options.assert_called_once_with(isolation_level="SERIALIZABLE") + + +@pytest.mark.asyncio +async def test_relational_rolls_back_on_exception() -> None: + session = _make_session() + with pytest.raises(ValueError, match="boom"): + await _relational_service(session).failing() + session.rollback.assert_awaited_once() + session.commit.assert_not_awaited() + + +# --------------------------------------------------------------------------- document dispatch +class _FakeTxn: + """Mimics motor's session.start_transaction() async CM: commit on clean exit, abort on error.""" + + def __init__(self, session: _FakeMongoSession) -> None: + self._session = session + + async def __aenter__(self) -> _FakeTxn: + return self + + async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: + if exc_type is None: + self._session.committed = True + else: + self._session.aborted = True + return False + + +class _FakeMongoSession: + def __init__(self) -> None: + self.committed = False + self.aborted = False + + async def __aenter__(self) -> _FakeMongoSession: + return self + + async def __aexit__(self, *exc: Any) -> bool: + return False + + def start_transaction(self) -> _FakeTxn: + return _FakeTxn(self) + + +class _FakeMotorClient: + def __init__(self, session: _FakeMongoSession) -> None: + self._session = session + + async def start_session(self) -> _FakeMongoSession: + return self._session + + +def _document_service(session: _FakeMongoSession) -> Any: + client = _FakeMotorClient(session) + + class Svc: + def __init__(self) -> None: + self._motor_client = client + + @transactional() + async def commit_path(self, *, session: Any = None) -> Any: + return session # session must be injected + + @transactional() + async def fails(self, *, session: Any = None) -> None: + raise ValueError("boom") + + @transactional(no_rollback_for=(KeyError,)) + async def fails_no_rollback(self, *, session: Any = None) -> None: + raise KeyError("ignored") + + return Svc() + + +@pytest.mark.asyncio +async def test_document_commits_and_injects_session() -> None: + session = _FakeMongoSession() + injected = await _document_service(session).commit_path() + assert injected is session # session kwarg injected + assert session.committed and not session.aborted + + +@pytest.mark.asyncio +async def test_document_aborts_on_rollback_for_exception() -> None: + session = _FakeMongoSession() + with pytest.raises(ValueError, match="boom"): + await _document_service(session).fails() + assert session.aborted and not session.committed + + +@pytest.mark.asyncio +async def test_document_commits_on_no_rollback_for_exception() -> None: + session = _FakeMongoSession() + with pytest.raises(KeyError): + await _document_service(session).fails_no_rollback() + # no_rollback_for -> committed despite the exception, which is still re-raised + assert session.committed and not session.aborted + + +@pytest.mark.asyncio +async def test_document_missing_motor_client_raises() -> None: + class NoClient: + @transactional() + async def work(self, *, session: Any = None) -> None: ... + + with pytest.raises(RuntimeError, match="no transaction manager"): + await NoClient().work() diff --git a/uv.lock b/uv.lock index 0f13d258..33159846 100644 --- a/uv.lock +++ b/uv.lock @@ -1981,7 +1981,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.74" +version = "26.6.75" source = { editable = "." } dependencies = [ { name = "pydantic" },