diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4e17ef2a..07a04204 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,34 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
---
+## v26.06.75 (2026-06-07)
+
+### Changed (data — unified `@transactional` for every backend)
+
+There is now **one** `@transactional` annotation for both relational and document services,
+imported from `pyfly.data` (Spring's uniform-annotation model). It dispatches at call time to the
+transaction manager the service exposes:
+
+- relational `async_sessionmaker` on `self._session_factory` → SQLAlchemy transaction (propagation,
+ isolation, read-only, `rollback_for`/`no_rollback_for`, repository session patching);
+- MongoDB client on `self._motor_client` → Mongo session + transaction (session injected as the
+ `session` kwarg, commit/abort with `rollback_for`/`no_rollback_for` parity).
+
+The decorator is backend-neutral (`pyfly.data.transactional`) and imports neither SQLAlchemy nor
+Motor at module scope; each backend supplies a lazily-imported runner (`run_relational_transaction`
+/ `run_mongo_transaction`). `from pyfly.data.relational.sqlalchemy import transactional` still works
+(same object); **`mongo_transactional` is now a deprecated alias** of `@transactional`.
+
+### Fixed / tested
+
+- The MongoDB transaction path — previously **completely untested** — now has behavior tests
+ (commit, abort on `rollback_for`, commit-and-re-raise on `no_rollback_for`, session injection,
+ missing-client error) and gained `rollback_for`/`no_rollback_for` parity with relational.
+- Relational isolation is now verified (a test asserts the `isolation_level` execution option
+ actually reaches the session — previously only the enum/metadata were checked).
+- Docs (`data.md`, `data-document.md`) + the companion `implement-data-repository` skill updated to
+ teach the single annotation; the doc's broken `@mongo_transactional(client)` example is corrected.
+
## v26.06.74 (2026-06-07)
### Docs (data — query-mechanism decision guide + Spring-Data parity matrix)
diff --git a/README.md b/README.md
index 58d2dc5b..90a548b2 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@
-
+
diff --git a/docs/modules/data-document.md b/docs/modules/data-document.md
index 418ce46f..ba3ceebd 100644
--- a/docs/modules/data-document.md
+++ b/docs/modules/data-document.md
@@ -797,44 +797,48 @@ Source files:
## Transaction Management
-### mongo_transactional Decorator
+### The unified `@transactional` decorator
-The `@mongo_transactional` decorator provides declarative async transaction management for MongoDB, mirroring the `@reactive_transactional` decorator from the SQLAlchemy adapter:
+MongoDB uses the **same** `@transactional` annotation as the relational adapter — import it from
+`pyfly.data`. On a service exposing a Motor client as `self._motor_client`, it opens a Mongo
+session + transaction, injects the session as the `session` keyword argument, commits on success,
+and aborts on error (honouring `rollback_for` / `no_rollback_for`, like the relational backend):
```python
-from pyfly.data.document.mongodb import mongo_transactional
-from motor.motor_asyncio import AsyncIOMotorClient
-
-client: AsyncIOMotorClient = ...
-
-
-@mongo_transactional(client)
-async def transfer_funds(from_id: str, to_id: str, amount: float) -> None:
- from_account = await AccountDocument.get(from_id)
- to_account = await AccountDocument.get(to_id)
+from pyfly.container import service
+from pyfly.data import transactional
- from_account.balance -= amount
- to_account.balance += amount
- await from_account.save()
- await to_account.save()
- # Transaction is committed automatically on success
- # Transaction is aborted automatically on exception
+@service
+class AccountService:
+ def __init__(self, motor_client) -> None:
+ self._motor_client = motor_client # selects the MongoDB transaction manager
+
+ @transactional()
+ async def transfer_funds(self, from_id: str, to_id: str, amount: float, *, session=None) -> None:
+ from_account = await AccountDocument.get(from_id)
+ to_account = await AccountDocument.get(to_id)
+ from_account.balance -= amount
+ to_account.balance += amount
+ await from_account.save()
+ await to_account.save()
+ # Committed automatically on success; aborted automatically on exception.
```
**How it works:**
-1. Starts a new Motor session via `client.start_session()`.
-2. Begins a transaction on the session via `session.start_transaction()`.
-3. Calls the wrapped function with all original arguments.
-4. On success: the transaction is committed (via the `async with` context manager).
-5. On exception: the transaction is aborted and the exception is re-raised.
+1. Resolves the Motor client from `self._motor_client`.
+2. Opens a session (`client.start_session()`) and a transaction (`session.start_transaction()`).
+3. Injects the live `ClientSession` as the `session` keyword argument and calls the function.
+4. On success the transaction commits; on an exception in `rollback_for` it aborts; an exception
+ in `no_rollback_for` commits and is then re-raised (mirroring the relational semantics).
-Unlike `@reactive_transactional` (which injects the session as the first argument), `@mongo_transactional` does not inject the session. The Beanie document operations automatically participate in the active transaction through Motor's session context.
+> **Deprecated:** `from pyfly.data.document.mongodb import mongo_transactional` still works but is
+> a thin alias of `@transactional`. Prefer `@transactional`.
### Replica Set Requirement
-MongoDB transactions require a replica set deployment. Standalone MongoDB instances do not support multi-document transactions. If you attempt to use `@mongo_transactional` against a standalone instance, MongoDB will raise an error.
+MongoDB transactions require a replica set deployment. Standalone MongoDB instances do not support multi-document transactions. If you attempt to use `@transactional` (or the deprecated `@mongo_transactional`) against a standalone instance, MongoDB will raise an error.
For local development, you can run a single-node replica set:
diff --git a/docs/modules/data.md b/docs/modules/data.md
index 54bed293..727de8f2 100644
--- a/docs/modules/data.md
+++ b/docs/modules/data.md
@@ -1076,7 +1076,7 @@ Some capabilities are **backend-specific** today:
| Soft delete (`SoftDeleteRepository`) | ✅ | ❌ not yet |
| Optimistic locking (`VersionedMixin` / `@Version`) | ✅ | ❌ not yet |
| Auditing auto-population | ✅ `created/updated_at` **and** `created/updated_by` | ⚠️ timestamps at insert only |
-| `@transactional` (propagation, isolation, read-only, `rollback_for`) | ✅ | ⚠️ basic commit/abort (`@mongo_transactional`, replica set required) |
+| `@transactional` — one annotation, both backends (`pyfly.data`) | ✅ propagation / isolation / read-only / `rollback_for` | ✅ commit/abort + `rollback_for` (replica set; propagation/isolation are relational-only) |
**Not yet implemented on either backend** (so you don't reach for them): `@Modifying`-style
declarative bulk `UPDATE`/`DELETE`; `Slice` (count-less paging) and streaming/reactive result
diff --git a/pyproject.toml b/pyproject.toml
index bcca7087..7a5b5048 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
-version = "26.6.74"
+version = "26.6.75"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py
index 2972c1bf..1c4ac46e 100644
--- a/src/pyfly/__init__.py
+++ b/src/pyfly/__init__.py
@@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""
-__version__ = "26.06.74"
+__version__ = "26.06.75"
diff --git a/src/pyfly/data/__init__.py b/src/pyfly/data/__init__.py
index f3670224..463d616c 100644
--- a/src/pyfly/data/__init__.py
+++ b/src/pyfly/data/__init__.py
@@ -36,27 +36,31 @@
from pyfly.data.query import query
from pyfly.data.query_parser import QueryMethodParser
from pyfly.data.specification import Specification
+from pyfly.data.transactional import Isolation, Propagation, transactional
__all__ = [
"BaseFilterUtils",
"BaseRepositoryPostProcessor",
"CrudRepository",
"DERIVED_PREFIXES",
+ "Isolation",
"Mapper",
- "default_mapper",
- "mapping",
"Order",
"Page",
"Pageable",
"PagingRepository",
+ "Propagation",
"QueryMethodCompilerPort",
"QueryMethodParser",
"RepositoryPort",
"SessionPort",
"Sort",
"Specification",
+ "default_mapper",
"is_projection",
+ "mapping",
"projection",
"projection_fields",
"query",
+ "transactional",
]
diff --git a/src/pyfly/data/document/mongodb/transactional.py b/src/pyfly/data/document/mongodb/transactional.py
index 4cbf4a03..ea5d6d34 100644
--- a/src/pyfly/data/document/mongodb/transactional.py
+++ b/src/pyfly/data/document/mongodb/transactional.py
@@ -11,51 +11,69 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""MongoDB transactional decorator — wraps async functions in a Mongo session + transaction."""
+"""MongoDB execution for the unified ``@transactional`` decorator.
+
+Use the backend-neutral ``@transactional`` from :mod:`pyfly.data` on document services too — it
+dispatches here when the service exposes a ``_motor_client``. ``mongo_transactional`` is kept as a
+**deprecated** alias of ``@transactional`` for backward compatibility.
+"""
from __future__ import annotations
-import functools
from collections.abc import Callable
-from typing import Any, TypeVar
-
-F = TypeVar("F", bound=Callable[..., Any])
-
+from typing import Any
-def mongo_transactional(func: F) -> F:
- """Wrap an async function in a MongoDB session and transaction.
+from pyfly.data.transactional import transactional
- The decorated function receives an extra ``session`` keyword argument
- bound to the active ``ClientSession``. If the function completes
- without error the transaction is committed; otherwise it is aborted.
- Requires a :class:`motor.motor_asyncio.AsyncIOMotorClient` to be
- resolvable from the first positional argument's ``_motor_client``
- attribute (typically ``self`` in a service bean).
+async def run_mongo_transaction(
+ func: Callable[..., Any],
+ args: tuple[Any, ...],
+ kwargs: dict[str, Any],
+ *,
+ rollback_for: tuple[type[BaseException], ...] = (Exception,),
+ no_rollback_for: tuple[type[BaseException], ...] = (),
+) -> Any:
+ """Execute *func* inside a MongoDB transaction (the document arm of ``@transactional``).
- Example::
+ Resolves the Motor client from ``self._motor_client``, opens a session + transaction, injects
+ the session as the ``session`` keyword argument, and commits on success. On error it aborts —
+ except for exception types in ``no_rollback_for`` (or any ``Exception`` not in ``rollback_for``),
+ which commit and then re-raise, mirroring the relational ``@transactional`` semantics. A
+ ``BaseException`` that is not an ``Exception`` (cancellation/shutdown) always aborts.
+ """
+ self_arg = args[0] if args else None
+ motor_client = getattr(self_arg, "_motor_client", None)
+ if motor_client is None:
+ raise RuntimeError(
+ f"{func.__qualname__}: cannot resolve Motor client. Ensure the service has a '_motor_client' attribute."
+ )
- class OrderService:
- def __init__(self, motor_client):
- self._motor_client = motor_client
+ async with await motor_client.start_session() as session:
+ kwargs["session"] = session
+ # session.start_transaction() commits on a clean context exit and aborts when an exception
+ # escapes it. To honour no_rollback_for we let such exceptions exit cleanly (commit) and
+ # re-raise them afterwards, rather than driving abort/commit manually (which would depend
+ # on Motor's lazy-vs-eager start_transaction semantics).
+ deferred: BaseException | None = None
+ result: Any = None
+ async with session.start_transaction():
+ try:
+ result = await func(*args, **kwargs)
+ except BaseException as exc:
+ if not isinstance(exc, Exception):
+ raise # cancellation/shutdown -> abort
+ if isinstance(exc, tuple(no_rollback_for)):
+ deferred = exc # commit, then surface
+ elif isinstance(exc, tuple(rollback_for)):
+ raise # -> abort
+ else:
+ deferred = exc # not in rollback_for -> commit, then surface
+ if deferred is not None:
+ raise deferred
+ return result
- @mongo_transactional
- async def place_order(self, order, *, session=None):
- ...
- """
- @functools.wraps(func)
- async def wrapper(*args: Any, **kwargs: Any) -> Any:
- # Resolve the Motor client from the first arg (self)
- self_arg = args[0] if args else None
- motor_client = getattr(self_arg, "_motor_client", None)
- if motor_client is None:
- raise RuntimeError(
- f"{func.__qualname__}: cannot resolve Motor client. Ensure the service has a '_motor_client' attribute."
- )
-
- async with await motor_client.start_session() as session, session.start_transaction():
- kwargs["session"] = session
- return await func(*args, **kwargs)
-
- return wrapper # type: ignore[return-value]
+# Backward-compatibility alias. Prefer the unified `@transactional` from `pyfly.data`.
+mongo_transactional = transactional
+"""Deprecated alias of :func:`pyfly.data.transactional.transactional`. Use ``@transactional``."""
diff --git a/src/pyfly/data/relational/sqlalchemy/transactional.py b/src/pyfly/data/relational/sqlalchemy/transactional.py
index 6a6b1bfb..b9da7a4e 100644
--- a/src/pyfly/data/relational/sqlalchemy/transactional.py
+++ b/src/pyfly/data/relational/sqlalchemy/transactional.py
@@ -11,12 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Declarative transaction management decorator."""
+"""SQLAlchemy (relational) execution for the unified ``@transactional`` decorator.
+
+The public ``@transactional`` annotation is the backend-neutral one in
+:mod:`pyfly.data.transactional` (re-exported here for backward compatibility). This module
+provides the relational *runner* it dispatches to (``run_relational_transaction``) plus
+``reactive_transactional`` for explicit-session use. ``Propagation`` / ``Isolation`` are
+re-exported from the core module.
+"""
from __future__ import annotations
import contextlib
-import enum
import functools
from collections.abc import Callable
from contextvars import ContextVar
@@ -25,6 +31,15 @@
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from pyfly.data.relational.routing import read_only as _read_only_scope
+from pyfly.data.transactional import Isolation, Propagation, transactional
+
+__all__ = [
+ "Isolation",
+ "Propagation",
+ "reactive_transactional",
+ "run_relational_transaction",
+ "transactional",
+]
F = TypeVar("F", bound=Callable[..., Any])
@@ -34,27 +49,6 @@
)
-class Propagation(enum.Enum):
- """Transaction propagation behaviour."""
-
- REQUIRED = "REQUIRED"
- REQUIRES_NEW = "REQUIRES_NEW"
- SUPPORTS = "SUPPORTS"
- NOT_SUPPORTED = "NOT_SUPPORTED"
- NEVER = "NEVER"
- MANDATORY = "MANDATORY"
-
-
-class Isolation(enum.Enum):
- """Transaction isolation level."""
-
- DEFAULT = "DEFAULT"
- READ_UNCOMMITTED = "READ UNCOMMITTED"
- READ_COMMITTED = "READ COMMITTED"
- REPEATABLE_READ = "REPEATABLE READ"
- SERIALIZABLE = "SERIALIZABLE"
-
-
def _patch_repositories(self_arg: Any, session: AsyncSession) -> None:
from pyfly.data.relational.sqlalchemy.repository import Repository
@@ -73,112 +67,102 @@ def _resolve_session_factory(self_arg: Any) -> async_sessionmaker[AsyncSession]
return factory
-def transactional(
+async def run_relational_transaction(
+ func: Callable[..., Any],
+ args: tuple[Any, ...],
+ kwargs: dict[str, Any],
+ *,
propagation: Propagation = Propagation.REQUIRED,
isolation: Isolation = Isolation.DEFAULT,
read_only: bool = False,
rollback_for: tuple[type[BaseException], ...] = (Exception,),
no_rollback_for: tuple[type[BaseException], ...] = (),
-) -> Callable[[F], F]:
- """DI-aware declarative transaction management decorator.
+) -> Any:
+ """Execute *func* inside a SQLAlchemy transaction (the relational arm of ``@transactional``).
- Resolves ``async_sessionmaker`` from ``self._session_factory`` and uses
- a ``ContextVar`` for propagation semantics modelled after Spring's
- ``@Transactional``.
+ Resolves ``async_sessionmaker`` from ``self._session_factory`` and uses a ``ContextVar`` for
+ propagation semantics modelled after Spring's ``@Transactional``.
"""
-
- def decorator(func: F) -> F:
- @functools.wraps(func)
- async def wrapper(*args: Any, **kwargs: Any) -> Any:
- self_arg = args[0] if args else None
- existing: AsyncSession | None = _active_session_var.get()
-
- if propagation is Propagation.NEVER:
- if existing is not None:
- raise RuntimeError("Propagation.NEVER — active transaction exists")
- return await func(*args, **kwargs)
-
- if propagation is Propagation.NOT_SUPPORTED:
- token = _active_session_var.set(None)
- try:
- return await func(*args, **kwargs)
- finally:
- _active_session_var.reset(token)
-
- if propagation is Propagation.SUPPORTS:
- return await func(*args, **kwargs)
-
- if propagation is Propagation.MANDATORY:
- if existing is None:
- raise RuntimeError("Propagation.MANDATORY — no active transaction")
- return await func(*args, **kwargs)
-
- if propagation is Propagation.REQUIRED and existing is not None:
- return await func(*args, **kwargs)
-
- session_factory = _resolve_session_factory(self_arg) if self_arg is not None else None
- if session_factory is None:
- raise RuntimeError(
- "No _session_factory available on self — ensure the service has an injected async_sessionmaker"
- )
-
- execution_options: dict[str, Any] = {}
- if isolation is not Isolation.DEFAULT:
- execution_options["isolation_level"] = isolation.value
-
- # read_only routes to the replica when the session factory is a
- # RoutingSessionFactory (pyfly.data.relational.routing) and flags the session.
- ro_scope = _read_only_scope() if read_only else contextlib.nullcontext()
- with ro_scope:
- async with session_factory() as session:
- if read_only:
- session.info["read_only"] = True
- if execution_options:
- session = session.execution_options(**execution_options) # type: ignore[attr-defined]
-
- await session.begin()
- token = _active_session_var.set(session)
- if self_arg is not None:
- _patch_repositories(self_arg, session)
- try:
- result = await func(*args, **kwargs)
- await session.commit()
- return result
- except BaseException as exc:
- # Never commit partial work on cancellation/shutdown: a BaseException
- # that is not an Exception (asyncio.CancelledError, KeyboardInterrupt,
- # SystemExit) always rolls back, regardless of rollback_for.
- if not isinstance(exc, Exception):
- await session.rollback()
- elif isinstance(exc, tuple(no_rollback_for)):
- await session.commit()
- elif isinstance(exc, tuple(rollback_for)):
- await session.rollback()
- else:
- await session.commit()
- raise
- finally:
- _active_session_var.reset(token)
-
- wrapper.__pyfly_transactional__ = True # type: ignore[attr-defined]
- wrapper.__pyfly_propagation__ = propagation # type: ignore[attr-defined]
- wrapper.__pyfly_isolation__ = isolation # type: ignore[attr-defined]
-
- return wrapper # type: ignore[return-value]
-
- return decorator
+ self_arg = args[0] if args else None
+ existing: AsyncSession | None = _active_session_var.get()
+
+ if propagation is Propagation.NEVER:
+ if existing is not None:
+ raise RuntimeError("Propagation.NEVER — active transaction exists")
+ return await func(*args, **kwargs)
+
+ if propagation is Propagation.NOT_SUPPORTED:
+ token = _active_session_var.set(None)
+ try:
+ return await func(*args, **kwargs)
+ finally:
+ _active_session_var.reset(token)
+
+ if propagation is Propagation.SUPPORTS:
+ return await func(*args, **kwargs)
+
+ if propagation is Propagation.MANDATORY:
+ if existing is None:
+ raise RuntimeError("Propagation.MANDATORY — no active transaction")
+ return await func(*args, **kwargs)
+
+ if propagation is Propagation.REQUIRED and existing is not None:
+ return await func(*args, **kwargs)
+
+ session_factory = _resolve_session_factory(self_arg) if self_arg is not None else None
+ if session_factory is None:
+ raise RuntimeError(
+ "No _session_factory available on self — ensure the service has an injected async_sessionmaker"
+ )
+
+ execution_options: dict[str, Any] = {}
+ if isolation is not Isolation.DEFAULT:
+ execution_options["isolation_level"] = isolation.value
+
+ # read_only routes to the replica when the session factory is a RoutingSessionFactory
+ # (pyfly.data.relational.routing) and flags the session.
+ ro_scope = _read_only_scope() if read_only else contextlib.nullcontext()
+ with ro_scope:
+ async with session_factory() as session:
+ if read_only:
+ session.info["read_only"] = True
+ if execution_options:
+ session = session.execution_options(**execution_options) # type: ignore[attr-defined]
+ await session.begin()
+ token = _active_session_var.set(session)
+ if self_arg is not None:
+ _patch_repositories(self_arg, session)
+ try:
+ result = await func(*args, **kwargs)
+ await session.commit()
+ return result
+ except BaseException as exc:
+ # A BaseException that is not an Exception (CancelledError, KeyboardInterrupt,
+ # SystemExit) always rolls back, regardless of rollback_for.
+ if not isinstance(exc, Exception):
+ await session.rollback()
+ elif isinstance(exc, tuple(no_rollback_for)):
+ await session.commit()
+ elif isinstance(exc, tuple(rollback_for)):
+ await session.rollback()
+ else:
+ await session.commit()
+ raise
+ finally:
+ _active_session_var.reset(token)
def reactive_transactional(
session_factory: async_sessionmaker[AsyncSession],
) -> Callable[[F], F]:
- """Decorator for declarative async transaction management.
+ """Decorator for declarative async transaction management with an explicit session factory.
+
+ Wraps an async function in a database transaction. The decorated function receives an
+ ``AsyncSession`` as its first argument. On success the transaction is committed; on exception
+ it is rolled back and the exception re-raised.
- Wraps an async function in a database transaction. The decorated function
- receives an AsyncSession as its first argument. On success the transaction
- is committed; on exception it is rolled back and the exception re-raised.
+ Usage::
- Usage:
@reactive_transactional(session_factory)
async def create_user(session: AsyncSession) -> User:
user = User(name="Alice")
diff --git a/src/pyfly/data/transactional.py b/src/pyfly/data/transactional.py
new file mode 100644
index 00000000..67274d01
--- /dev/null
+++ b/src/pyfly/data/transactional.py
@@ -0,0 +1,123 @@
+# Copyright 2026 Firefly Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unified declarative transaction management — one ``@transactional`` for every backend.
+
+Like Spring's ``@Transactional``, the annotation is **uniform**; the transaction *manager* is
+backend-specific and selected at call time from what the service exposes:
+
+- a relational ``async_sessionmaker`` on ``self._session_factory`` → SQLAlchemy transaction
+ (full propagation / isolation / read-only / rollback semantics), or
+- a MongoDB client on ``self._motor_client`` → Mongo `ClientSession` transaction.
+
+The backend adapters provide the execution (``run_relational_transaction`` /
+``run_mongo_transaction``); they are imported lazily so this core module pulls in neither
+SQLAlchemy nor Motor.
+"""
+
+from __future__ import annotations
+
+import enum
+import functools
+from collections.abc import Callable
+from typing import Any, TypeVar
+
+F = TypeVar("F", bound=Callable[..., Any])
+
+
+class Propagation(enum.Enum):
+ """Transaction propagation behaviour (relational backend)."""
+
+ REQUIRED = "REQUIRED"
+ REQUIRES_NEW = "REQUIRES_NEW"
+ SUPPORTS = "SUPPORTS"
+ NOT_SUPPORTED = "NOT_SUPPORTED"
+ NEVER = "NEVER"
+ MANDATORY = "MANDATORY"
+
+
+class Isolation(enum.Enum):
+ """Transaction isolation level (relational backend)."""
+
+ DEFAULT = "DEFAULT"
+ READ_UNCOMMITTED = "READ UNCOMMITTED"
+ READ_COMMITTED = "READ COMMITTED"
+ REPEATABLE_READ = "REPEATABLE READ"
+ SERIALIZABLE = "SERIALIZABLE"
+
+
+def transactional(
+ func: F | None = None,
+ /,
+ *,
+ propagation: Propagation = Propagation.REQUIRED,
+ isolation: Isolation = Isolation.DEFAULT,
+ read_only: bool = False,
+ rollback_for: tuple[type[BaseException], ...] = (Exception,),
+ no_rollback_for: tuple[type[BaseException], ...] = (),
+) -> Any:
+ """Declarative transaction management — the single annotation for **all** backends.
+
+ Works bare (``@transactional``) or parametrized (``@transactional(propagation=...)``). At
+ call time it dispatches to the transaction manager the service exposes:
+
+ - **Relational** (``self._session_factory`` is an ``async_sessionmaker``): opens a session,
+ applies ``propagation`` / ``isolation`` / ``read_only``, commits on success and rolls back
+ per ``rollback_for`` / ``no_rollback_for``, and patches the service's repositories onto the
+ active session.
+ - **Document** (``self._motor_client`` is an ``AsyncIOMotorClient``): opens a Mongo session +
+ transaction, injects it as the ``session`` kwarg, commits on success and aborts per
+ ``rollback_for`` / ``no_rollback_for``. (``propagation`` / ``isolation`` / ``read_only`` are
+ relational concepts and are ignored on the document backend.)
+
+ Raises ``RuntimeError`` if the service exposes neither manager.
+ """
+
+ def decorator(fn: F) -> F:
+ @functools.wraps(fn)
+ async def wrapper(*args: Any, **kwargs: Any) -> Any:
+ self_arg = args[0] if args else None
+
+ if getattr(self_arg, "_session_factory", None) is not None:
+ from pyfly.data.relational.sqlalchemy.transactional import run_relational_transaction
+
+ return await run_relational_transaction(
+ fn,
+ args,
+ kwargs,
+ propagation=propagation,
+ isolation=isolation,
+ read_only=read_only,
+ rollback_for=rollback_for,
+ no_rollback_for=no_rollback_for,
+ )
+
+ if getattr(self_arg, "_motor_client", None) is not None:
+ from pyfly.data.document.mongodb.transactional import run_mongo_transaction
+
+ return await run_mongo_transaction(
+ fn, args, kwargs, rollback_for=rollback_for, no_rollback_for=no_rollback_for
+ )
+
+ raise RuntimeError(
+ f"{fn.__qualname__}: @transactional found no transaction manager on the service. "
+ "Expose a relational '_session_factory' (async_sessionmaker) or a document "
+ "'_motor_client' (AsyncIOMotorClient)."
+ )
+
+ wrapper.__pyfly_transactional__ = True # type: ignore[attr-defined]
+ wrapper.__pyfly_propagation__ = propagation # type: ignore[attr-defined]
+ wrapper.__pyfly_isolation__ = isolation # type: ignore[attr-defined]
+ return wrapper # type: ignore[return-value]
+
+ return decorator(func) if func is not None else decorator
diff --git a/tests/data/test_transactional.py b/tests/data/test_transactional.py
index 258d8a2e..5da6010b 100644
--- a/tests/data/test_transactional.py
+++ b/tests/data/test_transactional.py
@@ -385,8 +385,10 @@ class Holder:
class TestErrorCases:
@pytest.mark.asyncio
async def test_no_session_factory_raises(self) -> None:
+ # A service exposing neither a relational _session_factory nor a document _motor_client
+ # gets the unified "no transaction manager" error (it names both expected attributes).
svc = _NoFactoryService()
- with pytest.raises(RuntimeError, match="No _session_factory"):
+ with pytest.raises(RuntimeError, match="no transaction manager"):
await svc.do_work()
@pytest.mark.asyncio
diff --git a/tests/data/test_unified_transactional.py b/tests/data/test_unified_transactional.py
new file mode 100644
index 00000000..8e26aa53
--- /dev/null
+++ b/tests/data/test_unified_transactional.py
@@ -0,0 +1,193 @@
+# Copyright 2026 Firefly Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unified @transactional (v26.06.75) — ONE annotation dispatching to relational or document.
+
+Proves a single `@transactional` works on both backends and closes the previously-untested
+MongoDB transaction path (commit / abort / no_rollback_for-commit / session injection / errors).
+"""
+
+from __future__ import annotations
+
+from typing import Any
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from pyfly.data import Isolation, transactional
+from pyfly.data.document.mongodb import mongo_transactional
+
+
+def test_one_annotation_is_shared_across_backends() -> None:
+ from pyfly.data.relational.sqlalchemy import transactional as relational_transactional
+
+ assert transactional is relational_transactional
+ assert mongo_transactional is transactional # deprecated alias
+
+
+# --------------------------------------------------------------------------- relational dispatch
+def _make_session() -> MagicMock:
+ session = MagicMock()
+ session.begin = AsyncMock()
+ session.commit = AsyncMock()
+ session.rollback = AsyncMock()
+ session.info = {}
+ session.execution_options = MagicMock(return_value=session)
+ session.__aenter__ = AsyncMock(return_value=session)
+ session.__aexit__ = AsyncMock(return_value=False)
+ return session
+
+
+def _relational_service(session: MagicMock) -> Any:
+ factory = MagicMock(return_value=session)
+
+ class Svc:
+ def __init__(self) -> None:
+ self._session_factory = factory
+
+ @transactional()
+ async def commit_path(self) -> str:
+ return "ok"
+
+ @transactional(isolation=Isolation.SERIALIZABLE)
+ async def with_isolation(self) -> str:
+ return "ok"
+
+ @transactional()
+ async def failing(self) -> str:
+ raise ValueError("boom")
+
+ return Svc()
+
+
+@pytest.mark.asyncio
+async def test_relational_commits_on_success() -> None:
+ session = _make_session()
+ await _relational_service(session).commit_path()
+ session.commit.assert_awaited_once()
+ session.rollback.assert_not_awaited()
+
+
+@pytest.mark.asyncio
+async def test_relational_isolation_execution_option_is_applied() -> None:
+ session = _make_session()
+ await _relational_service(session).with_isolation()
+ # the audit flagged this as unverified: assert the isolation_level actually reaches the session
+ session.execution_options.assert_called_once_with(isolation_level="SERIALIZABLE")
+
+
+@pytest.mark.asyncio
+async def test_relational_rolls_back_on_exception() -> None:
+ session = _make_session()
+ with pytest.raises(ValueError, match="boom"):
+ await _relational_service(session).failing()
+ session.rollback.assert_awaited_once()
+ session.commit.assert_not_awaited()
+
+
+# --------------------------------------------------------------------------- document dispatch
+class _FakeTxn:
+ """Mimics motor's session.start_transaction() async CM: commit on clean exit, abort on error."""
+
+ def __init__(self, session: _FakeMongoSession) -> None:
+ self._session = session
+
+ async def __aenter__(self) -> _FakeTxn:
+ return self
+
+ async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool:
+ if exc_type is None:
+ self._session.committed = True
+ else:
+ self._session.aborted = True
+ return False
+
+
+class _FakeMongoSession:
+ def __init__(self) -> None:
+ self.committed = False
+ self.aborted = False
+
+ async def __aenter__(self) -> _FakeMongoSession:
+ return self
+
+ async def __aexit__(self, *exc: Any) -> bool:
+ return False
+
+ def start_transaction(self) -> _FakeTxn:
+ return _FakeTxn(self)
+
+
+class _FakeMotorClient:
+ def __init__(self, session: _FakeMongoSession) -> None:
+ self._session = session
+
+ async def start_session(self) -> _FakeMongoSession:
+ return self._session
+
+
+def _document_service(session: _FakeMongoSession) -> Any:
+ client = _FakeMotorClient(session)
+
+ class Svc:
+ def __init__(self) -> None:
+ self._motor_client = client
+
+ @transactional()
+ async def commit_path(self, *, session: Any = None) -> Any:
+ return session # session must be injected
+
+ @transactional()
+ async def fails(self, *, session: Any = None) -> None:
+ raise ValueError("boom")
+
+ @transactional(no_rollback_for=(KeyError,))
+ async def fails_no_rollback(self, *, session: Any = None) -> None:
+ raise KeyError("ignored")
+
+ return Svc()
+
+
+@pytest.mark.asyncio
+async def test_document_commits_and_injects_session() -> None:
+ session = _FakeMongoSession()
+ injected = await _document_service(session).commit_path()
+ assert injected is session # session kwarg injected
+ assert session.committed and not session.aborted
+
+
+@pytest.mark.asyncio
+async def test_document_aborts_on_rollback_for_exception() -> None:
+ session = _FakeMongoSession()
+ with pytest.raises(ValueError, match="boom"):
+ await _document_service(session).fails()
+ assert session.aborted and not session.committed
+
+
+@pytest.mark.asyncio
+async def test_document_commits_on_no_rollback_for_exception() -> None:
+ session = _FakeMongoSession()
+ with pytest.raises(KeyError):
+ await _document_service(session).fails_no_rollback()
+ # no_rollback_for -> committed despite the exception, which is still re-raised
+ assert session.committed and not session.aborted
+
+
+@pytest.mark.asyncio
+async def test_document_missing_motor_client_raises() -> None:
+ class NoClient:
+ @transactional()
+ async def work(self, *, session: Any = None) -> None: ...
+
+ with pytest.raises(RuntimeError, match="no transaction manager"):
+ await NoClient().work()
diff --git a/uv.lock b/uv.lock
index 0f13d258..33159846 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1981,7 +1981,7 @@ wheels = [
[[package]]
name = "pyfly"
-version = "26.6.74"
+version = "26.6.75"
source = { editable = "." }
dependencies = [
{ name = "pydantic" },