Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/reference/adapters/psycopg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ Psycopg
PostgreSQL adapter using `psycopg 3 <https://www.psycopg.org/psycopg3/>`_ with both
sync and async support. Features native pipeline mode for multi-statement batching.

Cloud Connectivity
==================

``PsycopgSyncConfig`` can create sync pools through the Google AlloyDB in-process
connector when both ``psycopg`` and ``google-cloud-alloydb-connector`` are installed:

.. code-block:: python

from sqlspec.adapters.psycopg import PsycopgSyncConfig

config = PsycopgSyncConfig(
connection_config={"user": "app", "password": "secret", "dbname": "orders"},
driver_features={
"enable_alloydb": True,
"alloydb_instance_uri": "projects/project/locations/us-central1/clusters/cluster/instances/instance",
},
)

This connector path is sync-only. ``PsycopgAsyncConfig`` continues to use regular
psycopg async connection configuration.

Sync Configuration
==================

Expand Down
120 changes: 114 additions & 6 deletions sqlspec/adapters/psycopg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast

from mypy_extensions import mypyc_attr
from psycopg import Connection as PsycopgConnection
from psycopg_pool import AsyncConnectionPool, ConnectionPool
from typing_extensions import NotRequired

Expand Down Expand Up @@ -31,8 +32,9 @@
from sqlspec.config import AsyncDatabaseConfig, ExtensionConfigs, SyncDatabaseConfig
from sqlspec.driver._async import AsyncPoolConnectionContext, AsyncPoolSessionFactory
from sqlspec.driver._sync import SyncPoolConnectionContext, SyncPoolSessionFactory
from sqlspec.exceptions import ImproperConfigurationError
from sqlspec.exceptions import ImproperConfigurationError, MissingDependencyError
from sqlspec.extensions.events import EventRuntimeHints
from sqlspec.typing import ALLOYDB_CONNECTOR_INSTALLED
from sqlspec.utils.config_tools import normalize_connection_config

if TYPE_CHECKING:
Expand All @@ -58,6 +60,8 @@


PsycopgSSLMode = Literal["disable", "allow", "prefer", "require", "verify-ca", "verify-full"]
_ALLOYDB_DIRECT_CONNECTION_KEYS = ("conninfo", "host", "hostaddr", "port", "user", "password", "dbname", "db")
_ALLOYDB_CONNECTOR_PACKAGE = "google-cloud-alloydb-connector"


class PsycopgConnectionParams(TypedDict):
Expand Down Expand Up @@ -137,6 +141,13 @@ class PsycopgDriverFeatures(TypedDict):
- "table_queue": Durable table-backed queue with retries and exactly-once delivery (current default)
- "listen_notify_durable": Hybrid - real-time + durable (available when native support lands)
Defaults to "table_queue" until native LISTEN/NOTIFY support is implemented.
enable_alloydb: Enable sync-only Google AlloyDB connector integration.
Requires google-cloud-alloydb-connector. Defaults to False.
alloydb_instance_uri: AlloyDB instance URI in projects/... resource path form.
Required when enable_alloydb is True.
enable_alloydb_iam_auth: Enable AlloyDB IAM database authentication for sync connector connections.
Defaults to False.
alloydb_ip_type: AlloyDB connector IP type. Defaults to PRIVATE.
"""

enable_pgvector: NotRequired[bool]
Expand All @@ -146,6 +157,40 @@ class PsycopgDriverFeatures(TypedDict):
on_connection_create: NotRequired["Callable[..., Any]"]
enable_events: NotRequired[bool]
events_backend: NotRequired[str]
enable_alloydb: NotRequired[bool]
alloydb_instance_uri: NotRequired[str]
enable_alloydb_iam_auth: NotRequired[bool]
alloydb_ip_type: NotRequired[str]


def _make_alloydb_connection_class(
*,
connector: Any,
instance_uri: str,
enable_iam_auth: bool,
ip_type: str,
user: str | None,
password: str | None,
database: str | None,
) -> "type[PsycopgConnection[Any]]":
"""Build a psycopg connection class backed by the AlloyDB connector."""

class _AlloyDBPsycopgConnection(PsycopgConnection[Any]):
@classmethod
def connect(cls, conninfo: str = "", **kwargs: Any) -> "PsycopgSyncConnection":
_ = (cls, conninfo)
connector_kwargs = dict(kwargs)
connector_kwargs["enable_iam_auth"] = enable_iam_auth
connector_kwargs["ip_type"] = ip_type
if user is not None:
connector_kwargs["user"] = user
if password is not None:
connector_kwargs["password"] = password
if database is not None:
connector_kwargs["db"] = database
return cast("PsycopgSyncConnection", connector.connect(instance_uri, "psycopg", **connector_kwargs))

return _AlloyDBPsycopgConnection


class PsycopgSyncConnectionContext(SyncPoolConnectionContext):
Expand Down Expand Up @@ -252,8 +297,12 @@ def __init__(
self._user_connection_hook: Callable[[PsycopgSyncConnection], None] | None = features_dict.pop(
"on_connection_create", None
)
features_dict.setdefault("enable_alloydb", False)
features_dict.setdefault("enable_alloydb_iam_auth", False)
features_dict.setdefault("alloydb_ip_type", "PRIVATE")
self._pgvector_available: bool | None = None
self._paradedb_available: bool | None = None
self._alloydb_connector: Any | None = None

super().__init__(
connection_config=connection_config,
Expand All @@ -266,6 +315,57 @@ def __init__(
**kwargs,
)

self._validate_alloydb_config()

def _validate_alloydb_config(self) -> None:
"""Validate sync-only AlloyDB connector configuration."""
if not self.driver_features.get("enable_alloydb", False):
return

if not ALLOYDB_CONNECTOR_INSTALLED:
raise MissingDependencyError(package=_ALLOYDB_CONNECTOR_PACKAGE, install_package=_ALLOYDB_CONNECTOR_PACKAGE)

instance_uri = self.driver_features.get("alloydb_instance_uri")
if not instance_uri:
msg = (
"alloydb_instance_uri required when enable_alloydb is True. "
"Format: 'projects/PROJECT/locations/REGION/clusters/CLUSTER/instances/INSTANCE'"
)
raise ImproperConfigurationError(msg)

if not str(instance_uri).startswith("projects/"):
msg = (
f"Invalid AlloyDB instance URI format: {instance_uri}. "
"Expected format: 'projects/PROJECT/locations/REGION/clusters/CLUSTER/instances/INSTANCE'"
)
raise ImproperConfigurationError(msg)

def _setup_alloydb_connector(
self, config: "dict[str, Any]", pool_parameters: "dict[str, Any] | None" = None
) -> None:
"""Setup AlloyDB connector and configure psycopg-pool connection_class."""
from google.cloud.alloydb.connector import Connector # type: ignore[import-untyped,unused-ignore]

self._alloydb_connector = Connector()

user = config.get("user")
password = config.get("password")
database = config.get("dbname") or config.get("db")

for key in _ALLOYDB_DIRECT_CONNECTION_KEYS:
config.pop(key, None)

if pool_parameters is not None:
pool_parameters["connection_class"] = _make_alloydb_connection_class(
connector=self._alloydb_connector,
instance_uri=str(self.driver_features["alloydb_instance_uri"]),
enable_iam_auth=bool(self.driver_features.get("enable_alloydb_iam_auth", False)),
ip_type=str(self.driver_features.get("alloydb_ip_type", "PRIVATE")),
user=cast("str | None", user),
password=cast("str | None", password),
database=cast("str | None", database),
)

def _create_pool(self) -> "ConnectionPool":
"""Create the actual connection pool."""
all_config = dict(self.connection_config)
Expand Down Expand Up @@ -295,6 +395,13 @@ def _create_pool(self) -> "ConnectionPool":
conninfo = all_config.pop("conninfo", None)
kwargs = all_config.pop("kwargs", {})
all_config.update(kwargs)

if self.driver_features.get("enable_alloydb", False):
if conninfo is not None:
all_config["conninfo"] = conninfo
self._setup_alloydb_connector(all_config, pool_parameters)
conninfo = None

if conninfo:
pool = ConnectionPool(conninfo, kwargs=all_config, **pool_parameters)
else:
Expand Down Expand Up @@ -336,14 +443,15 @@ def _configure_connection(self, conn: "PsycopgSyncConnection") -> None:
self._user_connection_hook(conn)

def _close_pool(self) -> None:
"""Close the actual connection pool."""
if not self.connection_instance:
return

"""Close the actual connection pool and cleanup connectors."""
try:
self.connection_instance.close()
if self.connection_instance:
self.connection_instance.close()
finally:
self.connection_instance = None
if self._alloydb_connector is not None:
self._alloydb_connector.close()
self._alloydb_connector = None

def create_connection(self) -> "PsycopgSyncConnection":
"""Create a single connection (not from pool).
Expand Down
Loading
Loading