Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/datajoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
# Other
"errors",
"migrate",
"deploy",
"DataJointError",
"ThreadSafetyError",
"logger",
Expand All @@ -69,6 +70,7 @@
# =============================================================================
from . import errors
from . import migrate
from . import deploy
from .codecs import (
Codec,
get_codec,
Expand Down
25 changes: 25 additions & 0 deletions src/datajoint/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,31 @@ def enum_type_ddl(self, type_name: str, values: list[str]) -> str | None:
quoted_values = ", ".join(f"'{v}'" for v in values)
return f"CREATE TYPE {self.quote_identifier(type_name)} AS ENUM ({quoted_values})"

def replica_identity_ddl(self, full_table_name: str, mode: str) -> str:
"""
Generate ALTER TABLE ... REPLICA IDENTITY statement.

Controls how much of the old row PostgreSQL writes to WAL on UPDATE/DELETE.
``"default"`` logs only primary-key columns; ``"full"`` logs the entire row.
Required by some CDC tools (e.g. Databricks Lakehouse Sync) that need the
full pre-image to drive Slowly-Changing-Dimension history.

The ALTER is metadata-only, instant, and idempotent — re-applying the same
mode is a no-op at the storage layer.

Examples
--------
>>> adapter.replica_identity_ddl('"schema"."table"', 'full')
'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL'
>>> adapter.replica_identity_ddl('"schema"."table"', 'default')
'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT'
"""
if mode not in ("default", "full"):
from ..errors import DataJointError

raise DataJointError(f"Unsupported replica_identity mode: {mode!r}. Expected 'default' or 'full'.")
return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}"

def get_pending_enum_ddl(self, schema_name: str) -> list[str]:
"""
Get DDL statements for pending enum types and clear the pending list.
Expand Down
157 changes: 157 additions & 0 deletions src/datajoint/deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Deployment-time operations for configuring an existing DataJoint pipeline.

This module hosts idempotent operational helpers — things you run as part of a
deploy hook to configure a schema for its environment, distinct from
:mod:`datajoint.migrate` which handles one-shot schema/state evolution.

The boundary between the two:

- :mod:`datajoint.migrate` — fix legacy state, evolve a schema definition,
retroactive corrections. Cadence: one-shot. Examples: ``migrate_columns``,
``add_job_metadata_columns``, ``rebuild_lineage``.
- :mod:`datajoint.deploy` — configure an environment for a consumer's
requirements (CDC tools, replication, role grants, performance tuning).
Cadence: re-runnable, idempotent. Examples: :func:`set_replica_identity`.

Functions in this module should be safe to call repeatedly from a deploy hook
without accumulating side effects.
"""

from __future__ import annotations

from typing import Any

from .errors import DataJointError


def set_replica_identity(target: Any, mode: str = "full", dry_run: bool = True) -> dict:
"""
Apply ``ALTER TABLE ... REPLICA IDENTITY <mode>`` to a schema or table on PostgreSQL.

``REPLICA IDENTITY`` controls how much of the **old row** PostgreSQL writes to
the write-ahead log on UPDATE/DELETE. Under ``DEFAULT``, only primary-key
columns appear in WAL; under ``FULL``, the entire old row does.

Why this exists
---------------
Some change-data-capture (CDC) consumers require the full row pre-image to
drive their downstream models. The canonical example is **Databricks
Lakehouse Sync**: tables without ``REPLICA IDENTITY FULL`` are silently
skipped by the sync — no error, just missing data downstream. Other CDC
tools (Debezium, ClickHouse ClickPipes, Azure CDC) work fine with
``DEFAULT`` when tables have a primary key; only Databricks mandates
``FULL``.

This helper is the **operational** way to apply the setting. It is not a
migration: there's no legacy state being fixed; the setting is simply a
property of the deployment environment, and a fresh declare in a new
environment may need it re-applied. It is idempotent — re-applying the
same mode is a no-op at the storage layer — so it is safe to call from a
deploy hook on every release.

Cost
----
The ALTER itself is metadata-only and instant. The cost is in WAL volume
after the change: UPDATE/DELETE on tables with FULL log the entire old row,
which can be sizable on tables with TOASTed bytea columns. For DataJoint's
typical insert-append workload, this cost is negligible. The notable
scenario is bulk ``delete()`` on tables with ``<blob>`` columns — a
transient WAL burst proportional to the deleted-row payload size.

Compliance considerations
-------------------------
Under ``DEFAULT``, only primary-key values appear in WAL. Under ``FULL``,
entire rows do — including any PHI/PII/sensitive columns. For self-hosted
PostgreSQL with unrestricted WAL access this is a real consideration; for
managed PostgreSQL with logical replication confined to a specific
subscriber (Lakebase, RDS), WAL stays inside the managed environment's
security boundary. Apply intentionally.

Parameters
----------
target : Schema or Table
A :class:`datajoint.Schema` (all user tables) or a
:class:`datajoint.Table` class/instance (just that table).
mode : str, default ``"full"``
``"default"`` (PK only, minimal WAL) or ``"full"`` (entire row).
dry_run : bool, default ``True``
If True, collect the DDL statements but do not execute. Set to False
to actually apply.

Returns
-------
dict
- ``tables_analyzed`` (int): number of tables considered.
- ``tables_modified`` (int): number of tables on which the ALTER ran.
Always 0 when ``dry_run=True``.
- ``ddl`` (list[str]): the DDL statements that were (or would be) executed.

Raises
------
DataJointError
If the target's backend is not PostgreSQL, or if ``mode`` is not one of
``"default"`` / ``"full"``.

Examples
--------
>>> from datajoint.deploy import set_replica_identity
>>> # Preview
>>> set_replica_identity(my_schema, mode="full", dry_run=True)
{'tables_analyzed': 12, 'tables_modified': 0, 'ddl': ['ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL', ...]}
>>> # Apply
>>> set_replica_identity(my_schema, mode="full", dry_run=False)
{'tables_analyzed': 12, 'tables_modified': 12, 'ddl': [...]}
>>> # Single table
>>> set_replica_identity(MyTable, mode="full", dry_run=False)

See Also
--------
PostgreSQL: `Logical Replication — Replica Identity
<https://www.postgresql.org/docs/current/logical-replication-publication.html>`_.
Databricks: `Lakehouse Sync
<https://docs.databricks.com/aws/en/oltp/projects/lakehouse-sync>`_.
"""
if mode not in ("default", "full"):
raise DataJointError(f"mode must be 'default' or 'full'; got {mode!r}")

from .schemas import _Schema
from .table import Table

if isinstance(target, _Schema):
connection = target.connection
assert connection is not None, "Schema has no active connection"
adapter = connection.adapter
assert target.database is not None, "Schema is not activated"
tables = [adapter.make_full_table_name(target.database, t) for t in target.list_tables()]
elif isinstance(target, type) and issubclass(target, Table):
instance = target()
connection = instance.connection
assert connection is not None, "Table has no active connection"
adapter = connection.adapter
tables = [instance.full_table_name]
elif isinstance(target, Table):
connection = target.connection
assert connection is not None, "Table has no active connection"
adapter = connection.adapter
tables = [target.full_table_name]
else:
raise DataJointError(f"target must be a Schema or Table class/instance; got {type(target).__name__}")

if not hasattr(adapter, "replica_identity_ddl"):
raise DataJointError(
f"set_replica_identity is PostgreSQL-only; the {adapter.backend} adapter " "does not support REPLICA IDENTITY."
)

result: dict[str, Any] = {
"tables_analyzed": len(tables),
"tables_modified": 0,
"ddl": [],
}
for full_name in tables:
ddl = adapter.replica_identity_ddl(full_name, mode) # type: ignore[attr-defined]
result["ddl"].append(ddl)
if not dry_run:
connection.query(ddl)
result["tables_modified"] += 1
return result
17 changes: 17 additions & 0 deletions tests/unit/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,23 @@ def test_enum_type_ddl_postgres(self, postgres_adapter):
result = postgres_adapter.enum_type_ddl("status_type", ["active", "inactive"])
assert result == "CREATE TYPE \"status_type\" AS ENUM ('active', 'inactive')"

def test_replica_identity_ddl_full(self, postgres_adapter):
"""Test PostgreSQL replica identity DDL for 'full' mode."""
result = postgres_adapter.replica_identity_ddl('"schema"."table"', "full")
assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY FULL'

def test_replica_identity_ddl_default(self, postgres_adapter):
"""Test PostgreSQL replica identity DDL for 'default' mode."""
result = postgres_adapter.replica_identity_ddl('"schema"."table"', "default")
assert result == 'ALTER TABLE "schema"."table" REPLICA IDENTITY DEFAULT'

def test_replica_identity_ddl_invalid_mode(self, postgres_adapter):
"""Invalid mode raises DataJointError."""
from datajoint.errors import DataJointError

with pytest.raises(DataJointError, match="Unsupported replica_identity mode"):
postgres_adapter.replica_identity_ddl('"schema"."table"', "nothing")

def test_job_metadata_columns_postgres(self, postgres_adapter):
"""Test PostgreSQL job metadata columns."""
result = postgres_adapter.job_metadata_columns()
Expand Down
109 changes: 109 additions & 0 deletions tests/unit/test_deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Unit tests for :mod:`datajoint.deploy`.

These tests do not require a live PostgreSQL connection — they cover dispatch,
validation, and DDL string generation against the actual ``PostgreSQLAdapter``
and a stub adapter for the non-PG path.
"""

from __future__ import annotations

import pytest

import datajoint as dj
from datajoint.deploy import set_replica_identity
from datajoint.errors import DataJointError


class _FakeAdapter:
"""Bare-minimum adapter stub for testing dispatch (PostgreSQL-shaped)."""

backend = "postgresql"

def make_full_table_name(self, schema: str, table: str) -> str:
return f'"{schema}"."{table}"'

def replica_identity_ddl(self, full_table_name: str, mode: str) -> str:
return f"ALTER TABLE {full_table_name} REPLICA IDENTITY {mode.upper()}"


class _MySQLLikeAdapter:
"""Adapter without ``replica_identity_ddl`` (MySQL-shaped)."""

backend = "mysql"

def make_full_table_name(self, schema: str, table: str) -> str:
return f"`{schema}`.`{table}`"


class _FakeConnection:
"""Connection stub that records queries instead of executing them."""

def __init__(self, adapter: object) -> None:
self.adapter = adapter
self.queries: list[str] = []

def query(self, sql: str) -> None:
self.queries.append(sql)


class _FakeSchema(dj.schemas._Schema):
"""Schema stub bypassing __init__ wiring; sets just what set_replica_identity uses."""

def __init__(self, database: str, table_names: list[str], adapter: object) -> None:
# Skip dj.Schema.__init__ — fabricate the minimal attributes.
self.database = database
self._tables = table_names
self.connection = _FakeConnection(adapter)

def list_tables(self) -> list[str]:
return self._tables


def test_set_replica_identity_rejects_invalid_mode():
schema = _FakeSchema("ms", ["t1"], _FakeAdapter())
with pytest.raises(DataJointError, match="mode must be 'default' or 'full'"):
set_replica_identity(schema, mode="nothing")


def test_set_replica_identity_rejects_bad_target():
with pytest.raises(DataJointError, match="must be a Schema or Table"):
set_replica_identity("not a schema", mode="full")


def test_set_replica_identity_rejects_non_postgresql():
schema = _FakeSchema("ms", ["t1", "t2"], _MySQLLikeAdapter())
with pytest.raises(DataJointError, match="PostgreSQL-only"):
set_replica_identity(schema, mode="full")


def test_set_replica_identity_dry_run_no_execute():
schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter())
result = set_replica_identity(schema, mode="full", dry_run=True)
assert result["tables_analyzed"] == 2
assert result["tables_modified"] == 0
assert result["ddl"] == [
'ALTER TABLE "ms"."t1" REPLICA IDENTITY FULL',
'ALTER TABLE "ms"."t2" REPLICA IDENTITY FULL',
]
assert schema.connection.queries == []


def test_set_replica_identity_apply_runs_alters():
schema = _FakeSchema("ms", ["t1", "t2"], _FakeAdapter())
result = set_replica_identity(schema, mode="full", dry_run=False)
assert result["tables_analyzed"] == 2
assert result["tables_modified"] == 2
assert schema.connection.queries == result["ddl"]


def test_set_replica_identity_default_mode_emits_default_ddl():
schema = _FakeSchema("ms", ["t1"], _FakeAdapter())
result = set_replica_identity(schema, mode="default", dry_run=True)
assert result["ddl"] == ['ALTER TABLE "ms"."t1" REPLICA IDENTITY DEFAULT']


def test_set_replica_identity_empty_schema():
schema = _FakeSchema("ms", [], _FakeAdapter())
result = set_replica_identity(schema, mode="full", dry_run=False)
assert result == {"tables_analyzed": 0, "tables_modified": 0, "ddl": []}
Loading