From 637adb5a21c3b28b037a32788cca21e114d5a15b Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Wed, 10 Jun 2026 16:24:51 -0500 Subject: [PATCH] feat(deploy): set_replica_identity for PostgreSQL CDC (#1447) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds dj.deploy.set_replica_identity(target, mode, dry_run) — an idempotent deployment-time operation that applies ALTER TABLE ... REPLICA IDENTITY DEFAULT|FULL across a Schema or single Table on PostgreSQL. Required by some CDC consumers (Databricks Lakehouse Sync mandates FULL; silently skips tables without it). The ALTER is metadata-only and instant; the cost is in WAL volume on subsequent UPDATE/DELETE. Why a new module rather than dj.migrate or auto-emit in declare(): - Not a migration. Nothing legacy is being fixed; this configures an existing schema for an environment-specific consumer requirement. - Not a declare-time concern. Mixing auto-emit-on-declare with a separate utility for existing tables produces mixed state (new tables FULL, old tables DEFAULT) until both run. Migration-only is one coherent path. - Future operational helpers (publication membership, vacuum/reindex, grants) belong with set_replica_identity, not in migrate.py. dj.deploy is the home for that category. Shape: - adapters/postgres.py — replica_identity_ddl(full_name, mode) emits the ALTER TABLE statement. Pure DDL emitter; "default" and "full" both produce explicit ALTERs. Invalid mode raises DataJointError. - deploy.py (new) — set_replica_identity dispatches Schema vs Table, routes through the PG adapter, raises on non-PG backends, supports dry_run, returns {tables_analyzed, tables_modified, ddl}. - __init__.py — exports dj.deploy. Tests: - Unit tests for the adapter DDL (full, default, invalid mode). - Unit tests for deploy.set_replica_identity (mode validation, target validation, non-PG rejection, dry_run, apply, default mode, empty schema). All use stub adapters so no live PG is required. Slated for DataJoint 2.3. --- src/datajoint/__init__.py | 2 + src/datajoint/adapters/postgres.py | 25 +++++ src/datajoint/deploy.py | 157 +++++++++++++++++++++++++++++ tests/unit/test_adapters.py | 17 ++++ tests/unit/test_deploy.py | 109 ++++++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 src/datajoint/deploy.py create mode 100644 tests/unit/test_deploy.py diff --git a/src/datajoint/__init__.py b/src/datajoint/__init__.py index 4970b19d4..552e89a4c 100644 --- a/src/datajoint/__init__.py +++ b/src/datajoint/__init__.py @@ -57,6 +57,7 @@ # Other "errors", "migrate", + "deploy", "DataJointError", "ThreadSafetyError", "logger", @@ -69,6 +70,7 @@ # ============================================================================= from . import errors from . import migrate +from . import deploy from .codecs import ( Codec, get_codec, diff --git a/src/datajoint/adapters/postgres.py b/src/datajoint/adapters/postgres.py index 543e972d3..55ea189dc 100644 --- a/src/datajoint/adapters/postgres.py +++ b/src/datajoint/adapters/postgres.py @@ -1266,6 +1266,31 @@ def enum_type_ddl(self, type_name: str, values: list[str]) -> str | None: quoted_values = ", ".join(f"'{v}'" for v in values) return f"CREATE TYPE {self.quote_identifier(type_name)} AS ENUM ({quoted_values})" + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + """ + Generate ALTER TABLE ... REPLICA IDENTITY statement. + + Controls how much of the old row PostgreSQL writes to WAL on UPDATE/DELETE. + ``"default"`` logs only primary-key columns; ``"full"`` logs the entire row. + Required by some CDC tools (e.g. Databricks Lakehouse Sync) that need the + full pre-image to drive Slowly-Changing-Dimension history. + + The ALTER is metadata-only, instant, and idempotent — re-applying the same + mode is a no-op at the storage layer. + + Examples + -------- + >>> adapter.replica_identity_ddl('"schema"."table"', 'full') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + >>> adapter.replica_identity_ddl('"schema"."table"', 'default') + 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + """ + if mode not in ("default", "full"): + from ..errors import DataJointError + + raise DataJointError(f"Unsupported replica_identity mode: {mode!r}. Expected 'default' or 'full'.") + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + def get_pending_enum_ddl(self, schema_name: str) -> list[str]: """ Get DDL statements for pending enum types and clear the pending list. diff --git a/src/datajoint/deploy.py b/src/datajoint/deploy.py new file mode 100644 index 000000000..14fd2992c --- /dev/null +++ b/src/datajoint/deploy.py @@ -0,0 +1,157 @@ +""" +Deployment-time operations for configuring an existing DataJoint pipeline. + +This module hosts idempotent operational helpers — things you run as part of a +deploy hook to configure a schema for its environment, distinct from +:mod:`datajoint.migrate` which handles one-shot schema/state evolution. + +The boundary between the two: + +- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition, + retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``, + ``add_job_metadata_columns``, ``rebuild_lineage``. +- :mod:`datajoint.deploy` — configure an environment for a consumer's + requirements (CDC tools, replication, role grants, performance tuning). + Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`. + +Functions in this module should be safe to call repeatedly from a deploy hook +without accumulating side effects. +""" + +from __future__ import annotations + +from typing import Any + +from .errors import DataJointError + + +def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) -> dict: + """ + Apply ``ALTER TABLE ... REPLICA IDENTITY `` to a schema or table on PostgreSQL. + + ``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to + the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key + columns appear in WAL; under ``FULL``, the entire old row does. + + Why this exists + --------------- + Some change-data-capture (CDC) consumers require the full row pre-image to + drive their downstream models. The canonical example is **Databricks + Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently + skipped by the sync — no error, just missing data downstream. Other CDC + tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with + ``DEFAULT`` when tables have a primary key; only Databricks mandates + ``FULL``. + + This helper is the **operational** way to apply the setting. It is not a + migration: there's no legacy state being fixed; the setting is simply a + property of the deployment environment, and a fresh declare in a new + environment may need it re-applied. It is idempotent — re-applying the + same mode is a no-op at the storage layer — so it is safe to call from a + deploy hook on every release. + + Cost + ---- + The ALTER itself is metadata-only and instant. The cost is in WAL volume + after the change: UPDATE/DELETE on tables with FULL log the entire old row, + which can be sizable on tables with TOASTed bytea columns. For DataJoint's + typical insert-append workload, this cost is negligible. The notable + scenario is bulk ``delete()`` on tables with ```` columns — a + transient WAL burst proportional to the deleted-row payload size. + + Compliance considerations + ------------------------- + Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``, + entire rows do — including any PHI/PII/sensitive columns. For self-hosted + PostgreSQL with unrestricted WAL access this is a real consideration; for + managed PostgreSQL with logical replication confined to a specific + subscriber (Lakebase, RDS), WAL stays inside the managed environment's + security boundary. Apply intentionally. + + Parameters + ---------- + target : Schema or Table + A :class:`datajoint.Schema` (all user tables) or a + :class:`datajoint.Table` class/instance (just that table). + mode : str, default ``"full"`` + ``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row). + dry_run : bool, default ``True`` + If True, collect the DDL statements but do not execute. Set to False + to actually apply. + + Returns + ------- + dict + - ``tables_analyzed`` (int): number of tables considered. + - ``tables_modified`` (int): number of tables on which the ALTER ran. + Always 0 when ``dry_run=True``. + - ``ddl`` (list[str]): the DDL statements that were (or would be) executed. + + Raises + ------ + DataJointError + If the target's backend is not PostgreSQL, or if ``mode`` is not one of + ``"default"`` / ``"full"``. + + Examples + -------- + >>> from datajoint.deploy import set_replica_identity + >>> # Preview + >>> set_replica_identity(my_schema, mode="full", dry_run=True) + {'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]} + >>> # Apply + >>> set_replica_identity(my_schema, mode="full", dry_run=False) + {'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]} + >>> # Single table + >>> set_replica_identity(MyTable, mode="full", dry_run=False) + + See Also + -------- + PostgreSQL: `Logical Replication — Replica Identity + `_. + Databricks: `Lakehouse Sync + `_. + """ + if mode not in ("default", "full"): + raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}") + + from .schemas import _Schema + from .table import Table + + if isinstance(target, _Schema): + connection = target.connection + assert connection is not None, "Schema has no active connection" + adapter = connection.adapter + assert target.database is not None, "Schema is not activated" + tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()] + elif isinstance(target, type) and issubclass(target, Table): + instance = target() + connection = instance.connection + assert connection is not None, "Table has no active connection" + adapter = connection.adapter + tables = [instance.full_table_name] + elif isinstance(target, Table): + connection = target.connection + assert connection is not None, "Table has no active connection" + adapter = connection.adapter + tables = [target.full_table_name] + else: + raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}") + + if not hasattr(adapter, "replica_identity_ddl"): + raise DataJointError( + f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter " "does not support REPLICA IDENTITY." + ) + + result: dict[str, Any] = { + "tables_analyzed": len(tables), + "tables_modified": 0, + "ddl": [], + } + for full_name in tables: + ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined] + result["ddl"].append(ddl) + if not dry_run: + connection.query(ddl) + result["tables_modified"] += 1 + return result diff --git a/tests/unit/test_adapters.py b/tests/unit/test_adapters.py index edbff9d52..5b7e6a96e 100644 --- a/tests/unit/test_adapters.py +++ b/tests/unit/test_adapters.py @@ -532,6 +532,23 @@ def test_enum_type_ddl_postgres(self, postgres_adapter): result = postgres_adapter.enum_type_ddl("status_type", ["active", "inactive"]) assert result == "CREATE TYPE \"status_type\" AS ENUM ('active', 'inactive')" + def test_replica_identity_ddl_full(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'full' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "full") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL' + + def test_replica_identity_ddl_default(self, postgres_adapter): + """Test PostgreSQL replica identity DDL for 'default' mode.""" + result = postgres_adapter.replica_identity_ddl('"schema"."table"', "default") + assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT' + + def test_replica_identity_ddl_invalid_mode(self, postgres_adapter): + """Invalid mode raises DataJointError.""" + from datajoint.errors import DataJointError + + with pytest.raises(DataJointError, match="Unsupported replica_identity mode"): + postgres_adapter.replica_identity_ddl('"schema"."table"', "nothing") + def test_job_metadata_columns_postgres(self, postgres_adapter): """Test PostgreSQL job metadata columns.""" result = postgres_adapter.job_metadata_columns() diff --git a/tests/unit/test_deploy.py b/tests/unit/test_deploy.py new file mode 100644 index 000000000..18c54519d --- /dev/null +++ b/tests/unit/test_deploy.py @@ -0,0 +1,109 @@ +""" +Unit tests for :mod:`datajoint.deploy`. + +These tests do not require a live PostgreSQL connection — they cover dispatch, +validation, and DDL string generation against the actual ``PostgreSQLAdapter`` +and a stub adapter for the non-PG path. +""" + +from __future__ import annotations + +import pytest + +import datajoint as dj +from datajoint.deploy import set_replica_identity +from datajoint.errors import DataJointError + + +class _FakeAdapter: + """Bare-minimum adapter stub for testing dispatch (PostgreSQL-shaped).""" + + backend = "postgresql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f'"{schema}"."{table}"' + + def replica_identity_ddl(self, full_table_name: str, mode: str) -> str: + return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}" + + +class _MySQLLikeAdapter: + """Adapter without ``replica_identity_ddl`` (MySQL-shaped).""" + + backend = "mysql" + + def make_full_table_name(self, schema: str, table: str) -> str: + return f"`{schema}`.`{table}`" + + +class _FakeConnection: + """Connection stub that records queries instead of executing them.""" + + def __init__(self, adapter: object) -> None: + self.adapter = adapter + self.queries: list[str] = [] + + def query(self, sql: str) -> None: + self.queries.append(sql) + + +class _FakeSchema(dj.schemas._Schema): + """Schema stub bypassing __init__ wiring; sets just what set_replica_identity uses.""" + + def __init__(self, database: str, table_names: list[str], adapter: object) -> None: + # Skip dj.Schema.__init__ — fabricate the minimal attributes. + self.database = database + self._tables = table_names + self.connection = _FakeConnection(adapter) + + def list_tables(self) -> list[str]: + return self._tables + + +def test_set_replica_identity_rejects_invalid_mode(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + with pytest.raises(DataJointError, match="mode must be 'default' or 'full'"): + set_replica_identity(schema, mode="nothing") + + +def test_set_replica_identity_rejects_bad_target(): + with pytest.raises(DataJointError, match="must be a Schema or Table"): + set_replica_identity("not a schema", mode="full") + + +def test_set_replica_identity_rejects_non_postgresql(): + schema = _FakeSchema("ms", ["t1", "t2"], _MySQLLikeAdapter()) + with pytest.raises(DataJointError, match="PostgreSQL-only"): + set_replica_identity(schema, mode="full") + + +def test_set_replica_identity_dry_run_no_execute(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=True) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 0 + assert result["ddl"] == [ + 'ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', + 'ALTER TABLE "ms"."t2" REPLICA IDENTITY FULL', + ] + assert schema.connection.queries == [] + + +def test_set_replica_identity_apply_runs_alters(): + schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result["tables_analyzed"] == 2 + assert result["tables_modified"] == 2 + assert schema.connection.queries == result["ddl"] + + +def test_set_replica_identity_default_mode_emits_default_ddl(): + schema = _FakeSchema("ms", ["t1"], _FakeAdapter()) + result = set_replica_identity(schema, mode="default", dry_run=True) + assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY DEFAULT'] + + +def test_set_replica_identity_empty_schema(): + schema = _FakeSchema("ms", [], _FakeAdapter()) + result = set_replica_identity(schema, mode="full", dry_run=False) + assert result == {"tables_analyzed": 0, "tables_modified": 0, "ddl": []}