diff --git a/docs/reference/adapters/psycopg.rst b/docs/reference/adapters/psycopg.rst index 1aa24dde5..3eb99697f 100644 --- a/docs/reference/adapters/psycopg.rst +++ b/docs/reference/adapters/psycopg.rst @@ -5,6 +5,27 @@ Psycopg PostgreSQL adapter using `psycopg 3 `_ with both sync and async support. Features native pipeline mode for multi-statement batching. +Cloud Connectivity +================== + +``PsycopgSyncConfig`` can create sync pools through the Google AlloyDB in-process +connector when both ``psycopg`` and ``google-cloud-alloydb-connector`` are installed: + +.. code-block:: python + + from sqlspec.adapters.psycopg import PsycopgSyncConfig + + config = PsycopgSyncConfig( + connection_config={"user": "app", "password": "secret", "dbname": "orders"}, + driver_features={ + "enable_alloydb": True, + "alloydb_instance_uri": "projects/project/locations/us-central1/clusters/cluster/instances/instance", + }, + ) + +This connector path is sync-only. ``PsycopgAsyncConfig`` continues to use regular +psycopg async connection configuration. + Sync Configuration ================== diff --git a/sqlspec/adapters/psycopg/config.py b/sqlspec/adapters/psycopg/config.py index 22751f509..51666618c 100644 --- a/sqlspec/adapters/psycopg/config.py +++ b/sqlspec/adapters/psycopg/config.py @@ -3,8 +3,9 @@ from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast from mypy_extensions import mypyc_attr +from psycopg import Connection as PsycopgConnection from psycopg_pool import AsyncConnectionPool, ConnectionPool -from typing_extensions import NotRequired +from typing_extensions import NotRequired, Self from sqlspec.adapters.psycopg._typing import ( PsycopgAsyncConnection, @@ -31,8 +32,9 @@ from sqlspec.config import AsyncDatabaseConfig, ExtensionConfigs, SyncDatabaseConfig from sqlspec.driver._async import AsyncPoolConnectionContext, AsyncPoolSessionFactory from sqlspec.driver._sync import SyncPoolConnectionContext, SyncPoolSessionFactory -from sqlspec.exceptions import ImproperConfigurationError +from sqlspec.exceptions import ImproperConfigurationError, MissingDependencyError from sqlspec.extensions.events import EventRuntimeHints +from sqlspec.typing import ALLOYDB_CONNECTOR_INSTALLED from sqlspec.utils.config_tools import normalize_connection_config if TYPE_CHECKING: @@ -58,6 +60,8 @@ PsycopgSSLMode = Literal["disable", "allow", "prefer", "require", "verify-ca", "verify-full"] +_ALLOYDB_DIRECT_CONNECTION_KEYS = ("conninfo", "host", "hostaddr", "port", "user", "password", "dbname", "db") +_ALLOYDB_CONNECTOR_PACKAGE = "google-cloud-alloydb-connector" class PsycopgConnectionParams(TypedDict): @@ -137,6 +141,13 @@ class PsycopgDriverFeatures(TypedDict): - "table_queue": Durable table-backed queue with retries and exactly-once delivery (current default) - "listen_notify_durable": Hybrid - real-time + durable (available when native support lands) Defaults to "table_queue" until native LISTEN/NOTIFY support is implemented. + enable_alloydb: Enable sync-only Google AlloyDB connector integration. + Requires google-cloud-alloydb-connector. Defaults to False. + alloydb_instance_uri: AlloyDB instance URI in projects/... resource path form. + Required when enable_alloydb is True. + enable_alloydb_iam_auth: Enable AlloyDB IAM database authentication for sync connector connections. + Defaults to False. + alloydb_ip_type: AlloyDB connector IP type. Defaults to PRIVATE. """ enable_pgvector: NotRequired[bool] @@ -146,6 +157,50 @@ class PsycopgDriverFeatures(TypedDict): on_connection_create: NotRequired["Callable[..., Any]"] enable_events: NotRequired[bool] events_backend: NotRequired[str] + enable_alloydb: NotRequired[bool] + alloydb_instance_uri: NotRequired[str] + enable_alloydb_iam_auth: NotRequired[bool] + alloydb_ip_type: NotRequired[str] + + +def _make_alloydb_connection_class( + *, + connector: Any, + instance_uri: str, + enable_iam_auth: bool, + ip_type: str, + user: str | None, + password: str | None, + database: str | None, +) -> "type[PsycopgConnection[Any]]": + """Build a psycopg connection class backed by the AlloyDB connector.""" + + class _AlloyDBPsycopgConnection(PsycopgConnection[Any]): + @classmethod + def connect( + cls, + conninfo: str = "", + *, + autocommit: bool = False, + prepare_threshold: int | None = 5, + context: "AdaptContext | None" = None, + row_factory: "RowFactory[Any] | None" = None, + cursor_factory: "type[Cursor[Any]] | None" = None, + **kwargs: str | int | None, + ) -> Self: + _ = (cls, conninfo, autocommit, prepare_threshold, context, row_factory, cursor_factory) + connector_kwargs = dict(kwargs) + connector_kwargs["enable_iam_auth"] = enable_iam_auth + connector_kwargs["ip_type"] = ip_type + if user is not None: + connector_kwargs["user"] = user + if password is not None: + connector_kwargs["password"] = password + if database is not None: + connector_kwargs["db"] = database + return cast("Self", connector.connect(instance_uri, "psycopg", **connector_kwargs)) + + return _AlloyDBPsycopgConnection class PsycopgSyncConnectionContext(SyncPoolConnectionContext): @@ -252,8 +307,12 @@ def __init__( self._user_connection_hook: Callable[[PsycopgSyncConnection], None] | None = features_dict.pop( "on_connection_create", None ) + features_dict.setdefault("enable_alloydb", False) + features_dict.setdefault("enable_alloydb_iam_auth", False) + features_dict.setdefault("alloydb_ip_type", "PRIVATE") self._pgvector_available: bool | None = None self._paradedb_available: bool | None = None + self._alloydb_connector: Any | None = None super().__init__( connection_config=connection_config, @@ -266,6 +325,57 @@ def __init__( **kwargs, ) + self._validate_alloydb_config() + + def _validate_alloydb_config(self) -> None: + """Validate sync-only AlloyDB connector configuration.""" + if not self.driver_features.get("enable_alloydb", False): + return + + if not ALLOYDB_CONNECTOR_INSTALLED: + raise MissingDependencyError(package=_ALLOYDB_CONNECTOR_PACKAGE, install_package=_ALLOYDB_CONNECTOR_PACKAGE) + + instance_uri = self.driver_features.get("alloydb_instance_uri") + if not instance_uri: + msg = ( + "alloydb_instance_uri required when enable_alloydb is True. " + "Format: 'projects/PROJECT/locations/REGION/clusters/CLUSTER/instances/INSTANCE'" + ) + raise ImproperConfigurationError(msg) + + if not str(instance_uri).startswith("projects/"): + msg = ( + f"Invalid AlloyDB instance URI format: {instance_uri}. " + "Expected format: 'projects/PROJECT/locations/REGION/clusters/CLUSTER/instances/INSTANCE'" + ) + raise ImproperConfigurationError(msg) + + def _setup_alloydb_connector( + self, config: "dict[str, Any]", pool_parameters: "dict[str, Any] | None" = None + ) -> None: + """Setup AlloyDB connector and configure psycopg-pool connection_class.""" + from google.cloud.alloydb.connector import Connector # type: ignore[import-untyped,unused-ignore] + + self._alloydb_connector = Connector() + + user = config.get("user") + password = config.get("password") + database = config.get("dbname") or config.get("db") + + for key in _ALLOYDB_DIRECT_CONNECTION_KEYS: + config.pop(key, None) + + if pool_parameters is not None: + pool_parameters["connection_class"] = _make_alloydb_connection_class( + connector=self._alloydb_connector, + instance_uri=str(self.driver_features["alloydb_instance_uri"]), + enable_iam_auth=bool(self.driver_features.get("enable_alloydb_iam_auth", False)), + ip_type=str(self.driver_features.get("alloydb_ip_type", "PRIVATE")), + user=cast("str | None", user), + password=cast("str | None", password), + database=cast("str | None", database), + ) + def _create_pool(self) -> "ConnectionPool": """Create the actual connection pool.""" all_config = dict(self.connection_config) @@ -295,6 +405,13 @@ def _create_pool(self) -> "ConnectionPool": conninfo = all_config.pop("conninfo", None) kwargs = all_config.pop("kwargs", {}) all_config.update(kwargs) + + if self.driver_features.get("enable_alloydb", False): + if conninfo is not None: + all_config["conninfo"] = conninfo + self._setup_alloydb_connector(all_config, pool_parameters) + conninfo = None + if conninfo: pool = ConnectionPool(conninfo, kwargs=all_config, **pool_parameters) else: @@ -336,14 +453,15 @@ def _configure_connection(self, conn: "PsycopgSyncConnection") -> None: self._user_connection_hook(conn) def _close_pool(self) -> None: - """Close the actual connection pool.""" - if not self.connection_instance: - return - + """Close the actual connection pool and cleanup connectors.""" try: - self.connection_instance.close() + if self.connection_instance: + self.connection_instance.close() finally: self.connection_instance = None + if self._alloydb_connector is not None: + self._alloydb_connector.close() + self._alloydb_connector = None def create_connection(self) -> "PsycopgSyncConnection": """Create a single connection (not from pool). diff --git a/tests/unit/adapters/test_psycopg/test_cloud_connectors.py b/tests/unit/adapters/test_psycopg/test_cloud_connectors.py new file mode 100644 index 000000000..366e485a9 --- /dev/null +++ b/tests/unit/adapters/test_psycopg/test_cloud_connectors.py @@ -0,0 +1,240 @@ +"""Unit tests for Google AlloyDB connector integration in Psycopg.""" + +import sys +from collections.abc import Generator +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +from psycopg import Connection + +import sqlspec.adapters.psycopg.config as psycopg_config +from sqlspec.adapters.psycopg.config import PsycopgAsyncConfig, PsycopgSyncConfig +from sqlspec.exceptions import ImproperConfigurationError, MissingDependencyError + +# pyright: reportPrivateUsage=false + + +class _CapturedSyncPool: + """Capture psycopg sync pool constructor arguments without opening a connection.""" + + calls: list[tuple[str, dict[str, Any]]] = [] + + def __init__(self, conninfo: str = "", **kwargs: Any) -> None: + self.conninfo = conninfo + self.kwargs = kwargs + self.calls.append((conninfo, kwargs)) + + def close(self) -> None: + return None + + +class _CapturedAsyncPool: + """Capture psycopg async pool constructor arguments without opening a connection.""" + + calls: list[tuple[str, dict[str, Any]]] = [] + open_calls: int = 0 + + def __init__(self, conninfo: str = "", **kwargs: Any) -> None: + self.conninfo = conninfo + self.kwargs = kwargs + self.calls.append((conninfo, kwargs)) + + async def open(self) -> None: + type(self).open_calls += 1 + + +@pytest.fixture(autouse=True) +def disable_alloydb_by_default() -> Generator[None, None, None]: + """Disable the AlloyDB connector by default for stable tests.""" + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", False): + yield + + +@pytest.fixture +def mock_alloydb_module() -> Generator[MagicMock, None, None]: + """Create and register a mock google.cloud.alloydb module.""" + mock_connector_class = MagicMock() + mock_module = MagicMock() + mock_module.connector.Connector = mock_connector_class + + sys.modules["google.cloud.alloydb"] = mock_module + sys.modules["google.cloud.alloydb.connector"] = mock_module.connector + + yield mock_connector_class + + sys.modules.pop("google.cloud.alloydb", None) + sys.modules.pop("google.cloud.alloydb.connector", None) + + +def test_alloydb_defaults_to_false() -> None: + """AlloyDB connector support should be explicit opt-in for sync psycopg.""" + config = PsycopgSyncConfig(connection_config={"dbname": "app"}) + + assert config.driver_features["enable_alloydb"] is False + assert config.driver_features["enable_alloydb_iam_auth"] is False + assert config.driver_features["alloydb_ip_type"] == "PRIVATE" + assert "alloydb_instance_uri" not in config.driver_features + + +def test_alloydb_missing_package_raises_error() -> None: + """Enabling AlloyDB without the connector package should fail during config creation.""" + with pytest.raises(MissingDependencyError, match="google-cloud-alloydb-connector"): + PsycopgSyncConfig( + connection_config={"dbname": "app"}, + driver_features={ + "enable_alloydb": True, + "alloydb_instance_uri": "projects/p/locations/r/clusters/c/instances/i", + }, + ) + + +def test_alloydb_missing_instance_uri_raises_error() -> None: + """AlloyDB requires an instance URI when enabled.""" + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", True): + with pytest.raises( + ImproperConfigurationError, match="alloydb_instance_uri required when enable_alloydb is True" + ): + PsycopgSyncConfig(connection_config={"dbname": "app"}, driver_features={"enable_alloydb": True}) + + +def test_alloydb_invalid_instance_uri_format_raises_error() -> None: + """AlloyDB instance URIs must use the projects/... resource path form.""" + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", True): + with pytest.raises(ImproperConfigurationError, match="Invalid AlloyDB instance URI format"): + PsycopgSyncConfig( + connection_config={"dbname": "app"}, + driver_features={"enable_alloydb": True, "alloydb_instance_uri": "invalid-format"}, + ) + + +def test_user_connection_class_is_preserved_when_alloydb_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + """A user-supplied psycopg connection_class should pass through when AlloyDB is disabled.""" + _CapturedSyncPool.calls.clear() + monkeypatch.setattr(psycopg_config, "ConnectionPool", _CapturedSyncPool) + + PsycopgSyncConfig(connection_config={"dbname": "app", "connection_class": Connection})._create_pool() + + _conninfo, pool_kwargs = _CapturedSyncPool.calls[-1] + assert pool_kwargs["connection_class"] is Connection + assert pool_kwargs["kwargs"] == {"dbname": "app"} + + +def test_alloydb_pool_uses_connector_backed_connection_class( + monkeypatch: pytest.MonkeyPatch, mock_alloydb_module: MagicMock +) -> None: + """AlloyDB should strip direct connection fields and install a connector connection class.""" + mock_connector = MagicMock() + mock_alloydb_module.return_value = mock_connector + _CapturedSyncPool.calls.clear() + monkeypatch.setattr(psycopg_config, "ConnectionPool", _CapturedSyncPool) + + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", True): + config = PsycopgSyncConfig( + connection_config={ + "conninfo": "postgresql://user:pass@localhost/app", + "host": "127.0.0.1", + "port": 5432, + "user": "testuser", + "password": "testpass", + "dbname": "app", + "application_name": "sqlspec", + }, + driver_features={ + "enable_alloydb": True, + "alloydb_instance_uri": "projects/p/locations/r/clusters/c/instances/i", + }, + ) + config._create_pool() + + conninfo, pool_kwargs = _CapturedSyncPool.calls[-1] + connection_class = pool_kwargs["connection_class"] + + assert conninfo == "" + assert issubclass(connection_class, Connection) + assert connection_class is not Connection + assert pool_kwargs["kwargs"] == {"application_name": "sqlspec"} + assert config._alloydb_connector is mock_connector + assert "host" not in pool_kwargs["kwargs"] + assert "port" not in pool_kwargs["kwargs"] + assert "user" not in pool_kwargs["kwargs"] + assert "password" not in pool_kwargs["kwargs"] + assert "dbname" not in pool_kwargs["kwargs"] + + +def test_alloydb_connection_class_calls_connector( + monkeypatch: pytest.MonkeyPatch, mock_alloydb_module: MagicMock +) -> None: + """The injected connection class should call the AlloyDB connector with psycopg settings.""" + mock_connector = MagicMock() + expected_connection = MagicMock() + mock_connector.connect.return_value = expected_connection + mock_alloydb_module.return_value = mock_connector + _CapturedSyncPool.calls.clear() + monkeypatch.setattr(psycopg_config, "ConnectionPool", _CapturedSyncPool) + + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", True): + config = PsycopgSyncConfig( + connection_config={"user": "testuser", "password": "testpass", "dbname": "app"}, + driver_features={ + "enable_alloydb": True, + "alloydb_instance_uri": "projects/p/locations/r/clusters/c/instances/i", + "enable_alloydb_iam_auth": True, + "alloydb_ip_type": "PSC", + }, + ) + config._create_pool() + + _conninfo, pool_kwargs = _CapturedSyncPool.calls[-1] + connection_class = pool_kwargs["connection_class"] + + assert connection_class.connect("ignored") is expected_connection + mock_connector.connect.assert_called_once_with( + "projects/p/locations/r/clusters/c/instances/i", + "psycopg", + enable_iam_auth=True, + ip_type="PSC", + user="testuser", + password="testpass", + db="app", + ) + + +def test_alloydb_connector_cleanup(mock_alloydb_module: MagicMock) -> None: + """Closing a sync psycopg config should close the AlloyDB connector.""" + mock_connector = MagicMock() + mock_alloydb_module.return_value = mock_connector + with patch("sqlspec.adapters.psycopg.config.ALLOYDB_CONNECTOR_INSTALLED", True): + config = PsycopgSyncConfig( + connection_config={"user": "testuser", "password": "testpass", "dbname": "app"}, + driver_features={ + "enable_alloydb": True, + "alloydb_instance_uri": "projects/p/locations/r/clusters/c/instances/i", + }, + ) + config._setup_alloydb_connector({}) + config._close_pool() + + mock_connector.close.assert_called_once() + assert config._alloydb_connector is None + + +@pytest.mark.anyio +async def test_async_config_does_not_validate_or_route_alloydb(monkeypatch: pytest.MonkeyPatch) -> None: + """PsycopgAsyncConfig should remain unaffected by sync-only AlloyDB support.""" + _CapturedAsyncPool.calls.clear() + _CapturedAsyncPool.open_calls = 0 + monkeypatch.setattr(psycopg_config, "AsyncConnectionPool", _CapturedAsyncPool) + + config = PsycopgAsyncConfig( + connection_config={"dbname": "app"}, + driver_features={"enable_alloydb": True, "alloydb_instance_uri": "invalid-format"}, + ) + + await config._create_pool() + + conninfo, pool_kwargs = _CapturedAsyncPool.calls[-1] + assert conninfo == "" + assert pool_kwargs["kwargs"] == {"dbname": "app"} + assert "connection_class" not in pool_kwargs + assert config.driver_features["enable_alloydb"] is True