From 283578de7fa5321c58c2464f1ec191adbe2d6de6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Sun, 7 Jun 2026 20:51:15 +0200 Subject: [PATCH] feat(session): Postgres SessionRegistry (postgres parity) + export SessionRegistry from ports + bump v26.06.68 PostgresSessionRegistry (session/adapters/postgres_registry.py): durable, queryable, cross-process session-concurrency registry over a Postgres table (session_id PK, principal, created_at). Selected via pyfly.session.concurrency.registry=postgres -> clustered maximumSessions with no Redis. Table created lazily+idempotently; table name validated against injection. Hexagonal: AsyncEngine resolved lazily + injected by the composition root; no module-scope sqlalchemy. SessionRegistry now exported from pyfly.session.ports (audit consistency fix). Tests: tests/session/test_postgres_registry.py (6, SQL-recording fake) + tests/integration/ test_postgres_session_registry_integration.py (real Postgres: upsert/oldest-first/count/deregister, PASSED). Gates: mypy --strict (633), ruff + format, full suite 3932 passed. --- CHANGELOG.md | 16 +++ README.md | 2 +- pyproject.toml | 2 +- src/pyfly/__init__.py | 2 +- .../session/adapters/postgres_registry.py | 115 ++++++++++++++++++ src/pyfly/session/auto_configuration.py | 21 +++- src/pyfly/session/ports/__init__.py | 3 +- ...t_postgres_session_registry_integration.py | 53 ++++++++ tests/session/test_postgres_registry.py | 115 ++++++++++++++++++ tests/test_hexagonal.py | 4 +- uv.lock | 2 +- 11 files changed, 325 insertions(+), 10 deletions(-) create mode 100644 src/pyfly/session/adapters/postgres_registry.py create mode 100644 tests/integration/test_postgres_session_registry_integration.py create mode 100644 tests/session/test_postgres_registry.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f4fb85b2..5b034b2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## v26.06.68 (2026-06-07) + +### Added (session — Postgres SessionRegistry; Postgres parity) + +- **`PostgresSessionRegistry`** (`pyfly.session.adapters.postgres_registry`) — a durable, + queryable, cross-process session-concurrency registry backed by a Postgres table + (`session_id` PK, `principal`, `created_at`), selected via + `pyfly.session.concurrency.registry=postgres`. Lets relational-only deployments run clustered + `maximumSessions` control with **no Redis** (the user's "postgres, not just redis"). The table + is created lazily and idempotently; the table name is validated against injection. Hexagonal: + the SQLAlchemy `AsyncEngine` is resolved lazily and injected by the composition root; the + adapter imports no SQLAlchemy at module scope. Validated against **real Postgres** + (testcontainers: upsert, oldest-first ordering, count, deregister). +- **`SessionRegistry`** is now exported from `pyfly.session.ports` alongside `SessionStore` + (consistency fix from the audit). + ## v26.06.67 (2026-06-07) ### Added (scheduling — pluggable task executor; fixes a weak default) diff --git a/README.md b/README.md index a0731c14..c309823a 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Firefly Framework Python 3.12+ License: Apache 2.0 - Version: 26.06.67 + Version: 26.06.68 Type Checked: mypy strict Code Style: Ruff Async First diff --git a/pyproject.toml b/pyproject.toml index 18d31a6d..db7baf4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "pyfly" # CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4); # git tag, GitHub release and human-readable display use leading-zero form # (v26.05.04) to match the Java/.NET/Go siblings. -version = "26.6.67" +version = "26.6.68" description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more." readme = "README.md" license = "Apache-2.0" diff --git a/src/pyfly/__init__.py b/src/pyfly/__init__.py index b0b7b6ec..5d4d2431 100644 --- a/src/pyfly/__init__.py +++ b/src/pyfly/__init__.py @@ -13,4 +13,4 @@ # limitations under the License. """PyFly — Enterprise Python Framework.""" -__version__ = "26.06.67" +__version__ = "26.06.68" diff --git a/src/pyfly/session/adapters/postgres_registry.py b/src/pyfly/session/adapters/postgres_registry.py new file mode 100644 index 00000000..1f41e78a --- /dev/null +++ b/src/pyfly/session/adapters/postgres_registry.py @@ -0,0 +1,115 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Postgres table-backed :class:`~pyfly.session.concurrency.SessionRegistry` adapter. + +Durable, queryable, cross-process session concurrency control for relational-only deployments +(no Redis required) — the user's "postgres, not just redis". Hexagonal: the SQLAlchemy +``AsyncEngine`` is injected (lazily, via a factory) by the composition root; this module imports +no SQLAlchemy at module scope. The backing table is created lazily and idempotently on first use. +""" + +from __future__ import annotations + +import asyncio +import re +from collections.abc import Callable +from typing import Any + +# Guard against SQL injection via a misconfigured table name (it is interpolated, not bound). +_IDENT = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + + +class PostgresSessionRegistry: + """Per-principal session index in a Postgres table (session_id PK, principal, created_at).""" + + def __init__(self, engine_factory: Callable[[], Any], *, table: str = "pyfly_session_registry") -> None: + if not _IDENT.match(table): + raise ValueError(f"Invalid session-registry table name: {table!r}") + self._engine_factory = engine_factory + self._engine: Any = None + self._table = table + self._ensured = False + self._guard = asyncio.Lock() + + def _eng(self) -> Any: + if self._engine is None: + self._engine = self._engine_factory() + return self._engine + + async def _ensure_table(self) -> None: + if self._ensured: + return + from sqlalchemy import text + + async with self._guard: + if self._ensured: + return + async with self._eng().begin() as conn: + await conn.execute( + text( + f"CREATE TABLE IF NOT EXISTS {self._table} (" + "session_id TEXT PRIMARY KEY, principal TEXT NOT NULL, " + "created_at DOUBLE PRECISION NOT NULL)" + ) + ) + await conn.execute( + text(f"CREATE INDEX IF NOT EXISTS {self._table}_principal_idx ON {self._table} (principal)") + ) + self._ensured = True + + async def register(self, principal: str, session_id: str, created_at: float) -> None: + from sqlalchemy import text + + await self._ensure_table() + async with self._eng().begin() as conn: + await conn.execute( + text( + f"INSERT INTO {self._table} (session_id, principal, created_at) " + "VALUES (:s, :p, :c) ON CONFLICT (session_id) " + "DO UPDATE SET principal = EXCLUDED.principal, created_at = EXCLUDED.created_at" + ), + {"s": session_id, "p": principal, "c": created_at}, + ) + + async def deregister(self, principal: str, session_id: str) -> None: + from sqlalchemy import text + + await self._ensure_table() + async with self._eng().begin() as conn: + await conn.execute( + text(f"DELETE FROM {self._table} WHERE principal = :p AND session_id = :s"), + {"p": principal, "s": session_id}, + ) + + async def list_sessions(self, principal: str) -> list[tuple[str, float]]: + from sqlalchemy import text + + await self._ensure_table() + async with self._eng().connect() as conn: + result = await conn.execute( + text(f"SELECT session_id, created_at FROM {self._table} WHERE principal = :p ORDER BY created_at ASC"), + {"p": principal}, + ) + return [(row[0], float(row[1])) for row in result.fetchall()] # ORDER BY -> oldest first + + async def count(self, principal: str) -> int: + from sqlalchemy import text + + await self._ensure_table() + async with self._eng().connect() as conn: + result = await conn.execute( + text(f"SELECT COUNT(*) FROM {self._table} WHERE principal = :p"), + {"p": principal}, + ) + return int(result.scalar() or 0) diff --git a/src/pyfly/session/auto_configuration.py b/src/pyfly/session/auto_configuration.py index 1792b31e..e92b47b9 100644 --- a/src/pyfly/session/auto_configuration.py +++ b/src/pyfly/session/auto_configuration.py @@ -15,8 +15,11 @@ from __future__ import annotations +from typing import Any + from pyfly.config.auto import AutoConfiguration from pyfly.container.bean import bean +from pyfly.container.container import Container from pyfly.context.conditions import ( auto_configuration, conditional_on_missing_bean, @@ -72,7 +75,7 @@ class SessionConcurrencyAutoConfiguration: @bean def session_concurrency_controller( - self, config: Config, session_store: SessionStore + self, config: Config, session_store: SessionStore, container: Container ) -> SessionConcurrencyController: from pyfly.session.concurrency import ( ConcurrencyControlPolicy, @@ -84,9 +87,10 @@ def session_concurrency_controller( max_sessions=int(config.get("pyfly.session.concurrency.max-sessions", -1)), strategy=str(config.get("pyfly.session.concurrency.strategy", "evict-oldest")), ) - # Registry backend: 'memory' (default, single-instance) or 'redis' (cross-process). - # The Redis client is built here (composition root) and injected — the adapter never - # imports redis. + # Registry backend: 'memory' (default, single-instance), 'redis' (cross-process), or + # 'postgres' (durable + cross-process, no Redis needed). The Redis client / SQLAlchemy + # engine are obtained here (the composition root) and injected — the adapters never + # import their driver at module scope. registry: SessionRegistry registry_type = str(config.get("pyfly.session.concurrency.registry", "memory")).lower() if registry_type == "redis" and AutoConfiguration.is_available("redis.asyncio"): @@ -99,6 +103,15 @@ def session_concurrency_controller( or config.get("pyfly.session.redis.url", "redis://localhost:6379/0") ) registry = RedisSessionRegistry(aioredis.from_url(url)) # type: ignore[no-untyped-call,unused-ignore] + elif registry_type == "postgres": + from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry + + def _engine() -> Any: + from sqlalchemy.ext.asyncio import AsyncEngine + + return container.resolve(AsyncEngine) + + registry = PostgresSessionRegistry(_engine) else: registry = InMemorySessionRegistry() return SessionConcurrencyController(registry, policy, session_deleter=session_store.delete) diff --git a/src/pyfly/session/ports/__init__.py b/src/pyfly/session/ports/__init__.py index a516971d..368d6025 100644 --- a/src/pyfly/session/ports/__init__.py +++ b/src/pyfly/session/ports/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. """Session ports — abstract interfaces for the session module.""" +from pyfly.session.concurrency import SessionRegistry from pyfly.session.ports.outbound import SessionStore -__all__ = ["SessionStore"] +__all__ = ["SessionRegistry", "SessionStore"] diff --git a/tests/integration/test_postgres_session_registry_integration.py b/tests/integration/test_postgres_session_registry_integration.py new file mode 100644 index 00000000..15b5d703 --- /dev/null +++ b/tests/integration/test_postgres_session_registry_integration.py @@ -0,0 +1,53 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Integration test: Postgres SessionRegistry against a real Postgres (v26.06.68).""" + +from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +from pyfly.testing import postgres_container, pyfly_config_for, requires_docker + + +@pytest.fixture +def pg_url() -> Iterator[str]: + with postgres_container() as container: + yield pyfly_config_for(container)["pyfly.data.relational.url"] + + +@requires_docker +@pytest.mark.asyncio +async def test_postgres_session_registry_against_real_postgres(pg_url: str) -> None: + from sqlalchemy.ext.asyncio import create_async_engine + + from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry + + engine = create_async_engine(pg_url) + try: + reg = PostgresSessionRegistry(lambda: engine) + await reg.register("alice", "s2", 2.0) + await reg.register("alice", "s1", 1.0) # older score, inserted second + assert await reg.count("alice") == 2 + assert [sid for sid, _ in await reg.list_sessions("alice")] == ["s1", "s2"] # oldest-first + + await reg.register("alice", "s2", 9.0) # upsert (same session_id) must not duplicate + assert await reg.count("alice") == 2 + + await reg.deregister("alice", "s1") + assert await reg.count("alice") == 1 + assert [sid for sid, _ in await reg.list_sessions("alice")] == ["s2"] + finally: + await engine.dispose() diff --git a/tests/session/test_postgres_registry.py b/tests/session/test_postgres_registry.py new file mode 100644 index 00000000..05e004cf --- /dev/null +++ b/tests/session/test_postgres_registry.py @@ -0,0 +1,115 @@ +# Copyright 2026 Firefly Software Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Postgres SessionRegistry adapter (v26.06.68) — unit tests with a SQL-recording fake engine.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry +from pyfly.session.concurrency import SessionRegistry + + +class _FakeResult: + def __init__(self, rows: list | None = None, scalar: Any = None) -> None: + self._rows = rows or [] + self._scalar = scalar + + def fetchall(self) -> list: + return self._rows + + def scalar(self) -> Any: + return self._scalar + + +class _FakeConn: + def __init__(self, sql_log: list[str]) -> None: + self._sql_log = sql_log + + async def __aenter__(self) -> _FakeConn: + return self + + async def __aexit__(self, *exc: Any) -> bool: + return False + + async def execute(self, statement: Any, params: dict | None = None) -> _FakeResult: + self._sql_log.append(str(statement)) + return _FakeResult() + + +class _FakeEngine: + def __init__(self) -> None: + self.sql: list[str] = [] + + def begin(self) -> _FakeConn: + return _FakeConn(self.sql) + + def connect(self) -> _FakeConn: + return _FakeConn(self.sql) + + +def test_rejects_invalid_table_name() -> None: + with pytest.raises(ValueError, match="table name"): + PostgresSessionRegistry(lambda: _FakeEngine(), table="bad; DROP TABLE users") + + +@pytest.mark.asyncio +async def test_satisfies_session_registry_protocol() -> None: + assert isinstance(PostgresSessionRegistry(lambda: _FakeEngine()), SessionRegistry) + + +@pytest.mark.asyncio +async def test_register_issues_upsert_and_creates_table_once() -> None: + engine = _FakeEngine() + reg = PostgresSessionRegistry(lambda: engine) + await reg.register("alice", "s1", 1.0) + await reg.register("alice", "s2", 2.0) + joined = " ".join(engine.sql) + assert "CREATE TABLE IF NOT EXISTS" in joined + assert joined.count("CREATE TABLE IF NOT EXISTS") == 1 # table ensured once, not per call + assert "INSERT INTO" in joined and "ON CONFLICT (session_id)" in joined + + +@pytest.mark.asyncio +async def test_list_orders_by_created_at_and_count_uses_count() -> None: + engine = _FakeEngine() + reg = PostgresSessionRegistry(lambda: engine) + await reg.list_sessions("alice") + await reg.count("alice") + joined = " ".join(engine.sql) + assert "ORDER BY created_at ASC" in joined + assert "SELECT COUNT(*)" in joined + + +@pytest.mark.asyncio +async def test_deregister_issues_delete() -> None: + engine = _FakeEngine() + reg = PostgresSessionRegistry(lambda: engine) + await reg.deregister("alice", "s1") + assert any("DELETE FROM" in s for s in engine.sql) + + +def test_provider_postgres_selection() -> None: + from pyfly.container.container import Container + from pyfly.core.config import Config + from pyfly.session.adapters.memory import InMemorySessionStore + from pyfly.session.auto_configuration import SessionConcurrencyAutoConfiguration + + cfg = Config({"pyfly": {"session": {"concurrency": {"registry": "postgres"}}}}) + controller = SessionConcurrencyAutoConfiguration().session_concurrency_controller( + cfg, InMemorySessionStore(), Container() + ) + assert isinstance(controller._registry, PostgresSessionRegistry) diff --git a/tests/test_hexagonal.py b/tests/test_hexagonal.py index 42d6d4d4..02944e89 100644 --- a/tests/test_hexagonal.py +++ b/tests/test_hexagonal.py @@ -66,7 +66,9 @@ def test_sqlalchemy_only_in_data_and_cli(self): "and '/transactional/persistence/' not in l " "and '/eventsourcing/' not in l " "and '/scheduling/adapters/' not in l " - "and 'scheduling/auto_configuration' not in l]; " + "and 'scheduling/auto_configuration' not in l " + "and '/session/adapters/' not in l " + "and 'session/auto_configuration' not in l]; " "print('\\n'.join(bad) if bad else 'CLEAN'); " "sys.exit(len(bad))", ], diff --git a/uv.lock b/uv.lock index da064438..4b3117ba 100644 --- a/uv.lock +++ b/uv.lock @@ -1981,7 +1981,7 @@ wheels = [ [[package]] name = "pyfly" -version = "26.6.67" +version = "26.6.68" source = { editable = "." } dependencies = [ { name = "pydantic" },