Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## v26.06.68 (2026-06-07)

### Added (session — Postgres SessionRegistry; Postgres parity)

- **`PostgresSessionRegistry`** (`pyfly.session.adapters.postgres_registry`) — a durable,
queryable, cross-process session-concurrency registry backed by a Postgres table
(`session_id` PK, `principal`, `created_at`), selected via
`pyfly.session.concurrency.registry=postgres`. Lets relational-only deployments run clustered
`maximumSessions` control with **no Redis** (the user's "postgres, not just redis"). The table
is created lazily and idempotently; the table name is validated against injection. Hexagonal:
the SQLAlchemy `AsyncEngine` is resolved lazily and injected by the composition root; the
adapter imports no SQLAlchemy at module scope. Validated against **real Postgres**
(testcontainers: upsert, oldest-first ordering, count, deregister).
- **`SessionRegistry`** is now exported from `pyfly.session.ports` alongside `SessionStore`
(consistency fix from the audit).

## v26.06.67 (2026-06-07)

### Added (scheduling — pluggable task executor; fixes a weak default)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<a href="https://github.com/fireflyframework"><img src="https://img.shields.io/badge/Firefly_Framework-official-ff6600?logo=data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCI+PHBhdGggZmlsbD0id2hpdGUiIGQ9Ik0xMiAyQzYuNDggMiAyIDYuNDggMiAxMnM0LjQ4IDEwIDEwIDEwIDEwLTQuNDggMTAtMTBTMTcuNTIgMiAxMiAyeiIvPjwvc3ZnPg==" alt="Firefly Framework"></a>
<a href="https://www.python.org/"><img src="https://img.shields.io/badge/python-3.12%2B-blue?logo=python&logoColor=white" alt="Python 3.12+"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-Apache%202.0-green" alt="License: Apache 2.0"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.67-brightgreen" alt="Version: 26.06.67"></a>
<a href="#"><img src="https://img.shields.io/badge/version-26.06.68-brightgreen" alt="Version: 26.06.68"></a>
<a href="#"><img src="https://img.shields.io/badge/type--checked-mypy%20strict-blue?logo=python&logoColor=white" alt="Type Checked: mypy strict"></a>
<a href="#"><img src="https://img.shields.io/badge/code%20style-ruff-purple?logo=ruff&logoColor=white" alt="Code Style: Ruff"></a>
<a href="#"><img src="https://img.shields.io/badge/async-first-brightgreen" alt="Async First"></a>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "pyfly"
# CalVer YY.MM.PATCH — package metadata uses PEP 440 normalized form (26.5.4);
# git tag, GitHub release and human-readable display use leading-zero form
# (v26.05.04) to match the Java/.NET/Go siblings.
version = "26.6.67"
version = "26.6.68"
description = "The official Python implementation of the Firefly Framework — DI, CQRS, EDA, hexagonal architecture, and more."
readme = "README.md"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/pyfly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
"""PyFly — Enterprise Python Framework."""

__version__ = "26.06.67"
__version__ = "26.06.68"
115 changes: 115 additions & 0 deletions src/pyfly/session/adapters/postgres_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Postgres table-backed :class:`~pyfly.session.concurrency.SessionRegistry` adapter.

Durable, queryable, cross-process session concurrency control for relational-only deployments
(no Redis required) — the user's "postgres, not just redis". Hexagonal: the SQLAlchemy
``AsyncEngine`` is injected (lazily, via a factory) by the composition root; this module imports
no SQLAlchemy at module scope. The backing table is created lazily and idempotently on first use.
"""

from __future__ import annotations

import asyncio
import re
from collections.abc import Callable
from typing import Any

# Guard against SQL injection via a misconfigured table name (it is interpolated, not bound).
_IDENT = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")


class PostgresSessionRegistry:
"""Per-principal session index in a Postgres table (session_id PK, principal, created_at)."""

def __init__(self, engine_factory: Callable[[], Any], *, table: str = "pyfly_session_registry") -> None:
if not _IDENT.match(table):
raise ValueError(f"Invalid session-registry table name: {table!r}")
self._engine_factory = engine_factory
self._engine: Any = None
self._table = table
self._ensured = False
self._guard = asyncio.Lock()

def _eng(self) -> Any:
if self._engine is None:
self._engine = self._engine_factory()
return self._engine

async def _ensure_table(self) -> None:
if self._ensured:
return
from sqlalchemy import text

async with self._guard:
if self._ensured:
return
async with self._eng().begin() as conn:
await conn.execute(
text(
f"CREATE TABLE IF NOT EXISTS {self._table} ("
"session_id TEXT PRIMARY KEY, principal TEXT NOT NULL, "
"created_at DOUBLE PRECISION NOT NULL)"
)
)
await conn.execute(
text(f"CREATE INDEX IF NOT EXISTS {self._table}_principal_idx ON {self._table} (principal)")
)
self._ensured = True

async def register(self, principal: str, session_id: str, created_at: float) -> None:
from sqlalchemy import text

await self._ensure_table()
async with self._eng().begin() as conn:
await conn.execute(
text(
f"INSERT INTO {self._table} (session_id, principal, created_at) "
"VALUES (:s, :p, :c) ON CONFLICT (session_id) "
"DO UPDATE SET principal = EXCLUDED.principal, created_at = EXCLUDED.created_at"
),
{"s": session_id, "p": principal, "c": created_at},
)

async def deregister(self, principal: str, session_id: str) -> None:
from sqlalchemy import text

await self._ensure_table()
async with self._eng().begin() as conn:
await conn.execute(
text(f"DELETE FROM {self._table} WHERE principal = :p AND session_id = :s"),
{"p": principal, "s": session_id},
)

async def list_sessions(self, principal: str) -> list[tuple[str, float]]:
from sqlalchemy import text

await self._ensure_table()
async with self._eng().connect() as conn:
result = await conn.execute(
text(f"SELECT session_id, created_at FROM {self._table} WHERE principal = :p ORDER BY created_at ASC"),
{"p": principal},
)
return [(row[0], float(row[1])) for row in result.fetchall()] # ORDER BY -> oldest first

async def count(self, principal: str) -> int:
from sqlalchemy import text

await self._ensure_table()
async with self._eng().connect() as conn:
result = await conn.execute(
text(f"SELECT COUNT(*) FROM {self._table} WHERE principal = :p"),
{"p": principal},
)
return int(result.scalar() or 0)
21 changes: 17 additions & 4 deletions src/pyfly/session/auto_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

from __future__ import annotations

from typing import Any

from pyfly.config.auto import AutoConfiguration
from pyfly.container.bean import bean
from pyfly.container.container import Container
from pyfly.context.conditions import (
auto_configuration,
conditional_on_missing_bean,
Expand Down Expand Up @@ -72,7 +75,7 @@ class SessionConcurrencyAutoConfiguration:

@bean
def session_concurrency_controller(
self, config: Config, session_store: SessionStore
self, config: Config, session_store: SessionStore, container: Container
) -> SessionConcurrencyController:
from pyfly.session.concurrency import (
ConcurrencyControlPolicy,
Expand All @@ -84,9 +87,10 @@ def session_concurrency_controller(
max_sessions=int(config.get("pyfly.session.concurrency.max-sessions", -1)),
strategy=str(config.get("pyfly.session.concurrency.strategy", "evict-oldest")),
)
# Registry backend: 'memory' (default, single-instance) or 'redis' (cross-process).
# The Redis client is built here (composition root) and injected — the adapter never
# imports redis.
# Registry backend: 'memory' (default, single-instance), 'redis' (cross-process), or
# 'postgres' (durable + cross-process, no Redis needed). The Redis client / SQLAlchemy
# engine are obtained here (the composition root) and injected — the adapters never
# import their driver at module scope.
registry: SessionRegistry
registry_type = str(config.get("pyfly.session.concurrency.registry", "memory")).lower()
if registry_type == "redis" and AutoConfiguration.is_available("redis.asyncio"):
Expand All @@ -99,6 +103,15 @@ def session_concurrency_controller(
or config.get("pyfly.session.redis.url", "redis://localhost:6379/0")
)
registry = RedisSessionRegistry(aioredis.from_url(url)) # type: ignore[no-untyped-call,unused-ignore]
elif registry_type == "postgres":
from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry

def _engine() -> Any:
from sqlalchemy.ext.asyncio import AsyncEngine

return container.resolve(AsyncEngine)

registry = PostgresSessionRegistry(_engine)
else:
registry = InMemorySessionRegistry()
return SessionConcurrencyController(registry, policy, session_deleter=session_store.delete)
3 changes: 2 additions & 1 deletion src/pyfly/session/ports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""Session ports — abstract interfaces for the session module."""

from pyfly.session.concurrency import SessionRegistry
from pyfly.session.ports.outbound import SessionStore

__all__ = ["SessionStore"]
__all__ = ["SessionRegistry", "SessionStore"]
53 changes: 53 additions & 0 deletions tests/integration/test_postgres_session_registry_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration test: Postgres SessionRegistry against a real Postgres (v26.06.68)."""

from __future__ import annotations

from collections.abc import Iterator

import pytest

from pyfly.testing import postgres_container, pyfly_config_for, requires_docker


@pytest.fixture
def pg_url() -> Iterator[str]:
with postgres_container() as container:
yield pyfly_config_for(container)["pyfly.data.relational.url"]


@requires_docker
@pytest.mark.asyncio
async def test_postgres_session_registry_against_real_postgres(pg_url: str) -> None:
from sqlalchemy.ext.asyncio import create_async_engine

from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry

engine = create_async_engine(pg_url)
try:
reg = PostgresSessionRegistry(lambda: engine)
await reg.register("alice", "s2", 2.0)
await reg.register("alice", "s1", 1.0) # older score, inserted second
assert await reg.count("alice") == 2
assert [sid for sid, _ in await reg.list_sessions("alice")] == ["s1", "s2"] # oldest-first

await reg.register("alice", "s2", 9.0) # upsert (same session_id) must not duplicate
assert await reg.count("alice") == 2

await reg.deregister("alice", "s1")
assert await reg.count("alice") == 1
assert [sid for sid, _ in await reg.list_sessions("alice")] == ["s2"]
finally:
await engine.dispose()
115 changes: 115 additions & 0 deletions tests/session/test_postgres_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Postgres SessionRegistry adapter (v26.06.68) — unit tests with a SQL-recording fake engine."""

from __future__ import annotations

from typing import Any

import pytest

from pyfly.session.adapters.postgres_registry import PostgresSessionRegistry
from pyfly.session.concurrency import SessionRegistry


class _FakeResult:
def __init__(self, rows: list | None = None, scalar: Any = None) -> None:
self._rows = rows or []
self._scalar = scalar

def fetchall(self) -> list:
return self._rows

def scalar(self) -> Any:
return self._scalar


class _FakeConn:
def __init__(self, sql_log: list[str]) -> None:
self._sql_log = sql_log

async def __aenter__(self) -> _FakeConn:
return self

async def __aexit__(self, *exc: Any) -> bool:
return False

async def execute(self, statement: Any, params: dict | None = None) -> _FakeResult:
self._sql_log.append(str(statement))
return _FakeResult()


class _FakeEngine:
def __init__(self) -> None:
self.sql: list[str] = []

def begin(self) -> _FakeConn:
return _FakeConn(self.sql)

def connect(self) -> _FakeConn:
return _FakeConn(self.sql)


def test_rejects_invalid_table_name() -> None:
with pytest.raises(ValueError, match="table name"):
PostgresSessionRegistry(lambda: _FakeEngine(), table="bad; DROP TABLE users")


@pytest.mark.asyncio
async def test_satisfies_session_registry_protocol() -> None:
assert isinstance(PostgresSessionRegistry(lambda: _FakeEngine()), SessionRegistry)


@pytest.mark.asyncio
async def test_register_issues_upsert_and_creates_table_once() -> None:
engine = _FakeEngine()
reg = PostgresSessionRegistry(lambda: engine)
await reg.register("alice", "s1", 1.0)
await reg.register("alice", "s2", 2.0)
joined = " ".join(engine.sql)
assert "CREATE TABLE IF NOT EXISTS" in joined
assert joined.count("CREATE TABLE IF NOT EXISTS") == 1 # table ensured once, not per call
assert "INSERT INTO" in joined and "ON CONFLICT (session_id)" in joined


@pytest.mark.asyncio
async def test_list_orders_by_created_at_and_count_uses_count() -> None:
engine = _FakeEngine()
reg = PostgresSessionRegistry(lambda: engine)
await reg.list_sessions("alice")
await reg.count("alice")
joined = " ".join(engine.sql)
assert "ORDER BY created_at ASC" in joined
assert "SELECT COUNT(*)" in joined


@pytest.mark.asyncio
async def test_deregister_issues_delete() -> None:
engine = _FakeEngine()
reg = PostgresSessionRegistry(lambda: engine)
await reg.deregister("alice", "s1")
assert any("DELETE FROM" in s for s in engine.sql)


def test_provider_postgres_selection() -> None:
from pyfly.container.container import Container
from pyfly.core.config import Config
from pyfly.session.adapters.memory import InMemorySessionStore
from pyfly.session.auto_configuration import SessionConcurrencyAutoConfiguration

cfg = Config({"pyfly": {"session": {"concurrency": {"registry": "postgres"}}}})
controller = SessionConcurrencyAutoConfiguration().session_concurrency_controller(
cfg, InMemorySessionStore(), Container()
)
assert isinstance(controller._registry, PostgresSessionRegistry)
Loading
Loading