Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/reference/extensions/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ backend owns its own listener hub that:
* Serializes subscribe / unsubscribe under a lock so concurrent callers
cannot race on driver-level statements that share the connection.

The Oracle native backends (``advanced_queue`` and
``transactional_event_queue``) use an analogous pattern: a per-channel
The Oracle native backends (``aq`` and
``txeventq``) use an analogous pattern: a per-channel
queue-handle cache backed by a single dedicated session per backend instance.
``dequeue`` honors ``min(poll_interval, aq_wait_seconds)`` as its wait bound so
the caller's polling cadence is respected.
Expand All @@ -37,8 +37,8 @@ Oracle native event backends
Oracle provides two **native** messaging backends in addition to the default
``table_queue``:

* ``advanced_queue`` — classic Oracle Advanced Queuing (AQ).
* ``transactional_event_queue`` — Oracle Transactional Event Queues (TxEventQ).
* ``aq`` — classic Oracle Advanced Queuing (AQ).
* ``txeventq`` — Oracle Transactional Event Queues (TxEventQ).

Both share the same client path and JSON payloads; they differ only in how the
underlying queue is provisioned. Select one via ``events.backend``:
Expand All @@ -49,7 +49,7 @@ underlying queue is provisioned. Select one via ``events.backend``:

config = OracleAsyncConfig(
connection_config={"dsn": "..."},
extension_config={"events": {"backend": "transactional_event_queue"}},
extension_config={"events": {"backend": "txeventq"}},
)

The default remains ``table_queue``, which works on every Oracle edition
Expand All @@ -70,9 +70,9 @@ Provisioning
The backend attaches to an existing queue; it does not create one. Provision the
queue with ``DBMS_AQADM`` first:

* ``advanced_queue`` — ``create_queue_table(queue_payload_type => 'JSON')`` +
* ``aq`` — ``create_queue_table(queue_payload_type => 'JSON')`` +
``create_queue`` + ``start_queue``.
* ``transactional_event_queue`` —
* ``txeventq`` —
``create_transactional_event_queue(queue_payload_type => 'JSON', multiple_consumers => FALSE)``
+ ``start_queue``.

Expand Down
8 changes: 4 additions & 4 deletions sqlspec/adapters/oracledb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
OracleServerType = Literal["dedicated", "shared", "pooled"]
OraclePoolBoundary = Literal["statement", "transaction"]
OracleVectorReturnFormat = Literal["array", "list", "numpy"]
OracleEventsBackend = Literal["advanced_queue", "table_queue", "transactional_event_queue"]
OracleEventsBackend = Literal["aq", "table_queue", "txeventq"]


class OracleConnectionParams(TypedDict):
Expand Down Expand Up @@ -192,9 +192,9 @@ class OracleDriverFeatures(TypedDict):
This is separate from connection_config["events"], which enables python-oracledb
Thick mode database event notifications for HA and continuous query notification.
events_backend: Event channel backend selection.
Options: "advanced_queue", "table_queue", "transactional_event_queue"
- "advanced_queue": Oracle Advanced Queuing (native messaging, requires DBMS_AQADM privileges)
- "transactional_event_queue": Oracle Transactional Event Queues (native messaging, requires
Options: "aq", "table_queue", "txeventq"
- "aq": Oracle Advanced Queuing (native messaging, requires DBMS_AQADM privileges)
- "txeventq": Oracle Transactional Event Queues (native messaging, requires
DBMS_AQADM privileges; provisioned via DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE)
- "table_queue": Durable table-backed queue with retries and exactly-once delivery
Defaults to "table_queue" (works on all Oracle editions without special privileges).
Expand Down
4 changes: 2 additions & 2 deletions sqlspec/adapters/oracledb/events/_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
visibility: "int | None",
default_visibility: "int | None",
wait_ceiling: int,
backend_name: str = "advanced_queue",
backend_name: str = "aq",
) -> None:
self._config = config
self._queue_name_template = queue_name_template
Expand Down Expand Up @@ -242,7 +242,7 @@ def __init__(
visibility: "int | None",
default_visibility: "int | None",
wait_ceiling: int,
backend_name: str = "advanced_queue",
backend_name: str = "aq",
) -> None:
self._config = config
self._queue_name_template = queue_name_template
Expand Down
16 changes: 8 additions & 8 deletions sqlspec/adapters/oracledb/events/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class OracleSyncAQEventBackend:

supports_sync = True
supports_async = False
backend_name = "advanced_queue"
backend_name = "aq"

def __init__(self, config: "OracleSyncConfig", settings: "dict[str, Any] | None" = None) -> None:
if "oracledb" not in type(config).__module__:
Expand Down Expand Up @@ -170,7 +170,7 @@ class OracleAsyncAQEventBackend:

supports_sync = False
supports_async = True
backend_name = "advanced_queue"
backend_name = "aq"

def __init__(self, config: "OracleAsyncConfig", settings: "dict[str, Any] | None" = None) -> None:
if "oracledb" not in type(config).__module__:
Expand Down Expand Up @@ -247,15 +247,15 @@ class OracleSyncTxEventQEventBackend(OracleSyncAQEventBackend):

__slots__ = ()

backend_name = "transactional_event_queue"
backend_name = "txeventq"


class OracleAsyncTxEventQEventBackend(OracleAsyncAQEventBackend):
"""Oracle Transactional Event Queues backend for async Oracle adapters."""

__slots__ = ()

backend_name = "transactional_event_queue"
backend_name = "txeventq"


def _get_publish_queue(connection: Any, channel: str, queue_name: str) -> Any:
Expand Down Expand Up @@ -315,22 +315,22 @@ def create_event_backend(
"""EventChannel factory for the Oracle AQ backend."""
is_async = config.is_async
match (backend_name, is_async):
case ("advanced_queue", False):
case ("aq", False):
try:
return OracleSyncAQEventBackend(config, extension_settings) # type: ignore[arg-type]
except (ImproperConfigurationError, MissingDependencyError):
return None
case ("advanced_queue", True):
case ("aq", True):
try:
return OracleAsyncAQEventBackend(config, extension_settings) # type: ignore[arg-type]
except (ImproperConfigurationError, MissingDependencyError):
return None
case ("transactional_event_queue", False):
case ("txeventq", False):
try:
return OracleSyncTxEventQEventBackend(config, extension_settings) # type: ignore[arg-type]
except (ImproperConfigurationError, MissingDependencyError):
return None
case ("transactional_event_queue", True):
case ("txeventq", True):
try:
return OracleAsyncTxEventQEventBackend(config, extension_settings) # type: ignore[arg-type]
except (ImproperConfigurationError, MissingDependencyError):
Expand Down
4 changes: 2 additions & 2 deletions sqlspec/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,13 @@ class EventsConfig(TypedDict):
Use in ``extension_config["events"]``.
"""

backend: NotRequired[Literal["listen_notify", "table_queue", "listen_notify_durable", "advanced_queue"]]
backend: NotRequired[Literal["listen_notify", "table_queue", "listen_notify_durable", "aq"]]
"""Backend implementation. PostgreSQL adapters default to 'listen_notify', others to 'table_queue'.

- listen_notify: Real-time PostgreSQL LISTEN/NOTIFY (ephemeral)
- table_queue: Durable table-backed queue with retries (all adapters)
- listen_notify_durable: Hybrid combining both (PostgreSQL only)
- advanced_queue: Oracle Advanced Queueing
- aq: Oracle Advanced Queueing
"""

queue_table: NotRequired[str]
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/adapters/oracledb/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def oracle_aq_privileges(oracle_23ai_service: "OracleService") -> None:
"""Grant the container app user the privileges required to run DBMS_AQADM.

Unlocks both classic Advanced Queuing and Transactional Event Queues for the
``advanced_queue`` / ``transactional_event_queue`` events backends. The grants persist
``aq`` / ``txeventq`` events backends. The grants persist
for the container lifetime, so session scope is sufficient and naturally idempotent
(re-granting an existing role/privilege is a no-op in Oracle).
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Async Oracle Advanced Queuing event channel parity tests.

Mirrors the asyncpg listen/notify parity bar for the async advanced_queue backend
Mirrors the asyncpg listen/notify parity bar for the async aq backend
(OracleAsyncAQEventBackend), which had no live-queue coverage before this suite.
"""

Expand Down Expand Up @@ -31,7 +31,7 @@ async def _wait_for_message(received: "list[Any]", count: int = 1) -> None:


def _async_config(oracle_service: OracleService, **events: Any) -> OracleAsyncConfig:
events_config: dict[str, Any] = {"backend": "advanced_queue", **events}
events_config: dict[str, Any] = {"backend": "aq", **events}
return OracleAsyncConfig(
connection_config={
"host": oracle_service.host,
Expand All @@ -50,7 +50,7 @@ def _async_config(oracle_service: OracleService, **events: Any) -> OracleAsyncCo
async def oracle_aq_async_config(
provision_classic_aq: "Callable[..., AbstractContextManager[None]]", oracle_23ai_service: OracleService
) -> "AsyncGenerator[OracleAsyncConfig, None]":
"""Async Oracle config backed by a live advanced_queue queue."""
"""Async Oracle config backed by a live aq queue."""

config = _async_config(oracle_23ai_service)
with provision_classic_aq():
Expand All @@ -69,7 +69,7 @@ async def test_oracle_aq_async_publish_and_ack(oracle_aq_async_config: OracleAsy
channel = spec.event_channel(oracle_aq_async_config)

assert isinstance(channel, AsyncEventChannel)
assert channel._backend_name == "advanced_queue" # pyright: ignore[reportPrivateUsage]
assert channel._backend_name == "aq" # pyright: ignore[reportPrivateUsage]

event_id = await channel.publish("alerts", {"action": "test"})
assert len(event_id) == 32
Expand Down Expand Up @@ -147,7 +147,7 @@ async def test_oracle_aq_async_concurrent_multi_channel(
) -> None:
"""Concurrent listeners on distinct per-channel queues stay isolated and race-free.

The advanced_queue backend routes a channel to its own physical queue via the
The aq backend routes a channel to its own physical queue via the
``{channel}`` template, so each listener must see only its own channel's events even
while both drain the shared hub connection under an asyncio lock.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Thin-mode JSON payload verification for the Oracle advanced_queue events backend.
"""Thin-mode JSON payload verification for the Oracle aq events backend.

Confirms that classic Advanced Queuing carries a structured JSON payload through a full
enqueue/dequeue cycle while the adapter is in its default thin mode (no Instant Client).
Expand All @@ -21,7 +21,7 @@
def oracle_aq_json_config(
provision_classic_aq: "Callable[..., AbstractContextManager[None]]", oracle_23ai_service: OracleService
) -> Generator[OracleSyncConfig, None, None]:
"""Provision a JSON-payload AQ queue for the advanced_queue backend."""
"""Provision a JSON-payload AQ queue for the aq backend."""

config = OracleSyncConfig(
connection_config={
Expand All @@ -31,7 +31,7 @@ def oracle_aq_json_config(
"user": oracle_23ai_service.user,
"password": oracle_23ai_service.password,
},
extension_config={"events": {"backend": "advanced_queue"}},
extension_config={"events": {"backend": "aq"}},
)

with provision_classic_aq(payload_type="JSON"):
Expand All @@ -41,7 +41,7 @@ def oracle_aq_json_config(
config.close_pool()


def test_advanced_queue_json_round_trips_in_thin_mode(oracle_aq_json_config: OracleSyncConfig) -> None:
def test_aq_json_round_trips_in_thin_mode(oracle_aq_json_config: OracleSyncConfig) -> None:
"""A nested JSON payload survives enqueue/dequeue unchanged with a thin-mode connection."""

with oracle_aq_json_config.provide_session() as driver:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def oracle_aq_poll_config(
"user": oracle_23ai_service.user,
"password": oracle_23ai_service.password,
},
extension_config={"events": {"backend": "advanced_queue", "aq_wait_seconds": _AQ_WAIT_SECONDS}},
extension_config={"events": {"backend": "aq", "aq_wait_seconds": _AQ_WAIT_SECONDS}},
)

with provision_classic_aq():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _wait_for_message(received: "list[Any]", count: int = 1) -> None:
def oracle_aq_config(
provision_classic_aq: "Callable[..., AbstractContextManager[None]]", oracle_23ai_service: OracleService
) -> Generator[OracleSyncConfig, None, None]:
"""Provision Oracle config with a live advanced_queue queue for tests."""
"""Provision Oracle config with a live aq queue for tests."""

config = OracleSyncConfig(
connection_config={
Expand All @@ -40,7 +40,7 @@ def oracle_aq_config(
"user": oracle_23ai_service.user,
"password": oracle_23ai_service.password,
},
extension_config={"events": {"backend": "advanced_queue"}},
extension_config={"events": {"backend": "aq"}},
)

with provision_classic_aq():
Expand All @@ -59,7 +59,7 @@ def test_oracle_aq_publish_receive(oracle_aq_config: OracleSyncConfig) -> None:

assert isinstance(channel, SyncEventChannel)

assert channel._backend_name == "advanced_queue" # pyright: ignore[reportPrivateUsage]
assert channel._backend_name == "aq" # pyright: ignore[reportPrivateUsage]

event_id = channel.publish("alerts", {"action": "refresh"})
iterator = channel.iter_events("alerts", poll_interval=1.0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Oracle Transactional Event Queues (TxEventQ) event channel integration tests.

Mirrors the advanced_queue parity bar for the transactional_event_queue backend, which
Mirrors the aq parity bar for the txeventq backend, which
shares the AQ client path and differs only in provisioning (CREATE_TRANSACTIONAL_EVENT_QUEUE).
"""

Expand Down Expand Up @@ -40,7 +40,7 @@ async def _async_wait_for_message(received: "list[Any]", count: int = 1) -> None


def _sync_config(oracle_service: OracleService, **events: Any) -> OracleSyncConfig:
events_config: dict[str, Any] = {"backend": "transactional_event_queue", "aq_queue": _QUEUE_NAME, **events}
events_config: dict[str, Any] = {"backend": "txeventq", "aq_queue": _QUEUE_NAME, **events}
return OracleSyncConfig(
connection_config={
"host": oracle_service.host,
Expand All @@ -54,7 +54,7 @@ def _sync_config(oracle_service: OracleService, **events: Any) -> OracleSyncConf


def _async_config(oracle_service: OracleService, **events: Any) -> OracleAsyncConfig:
events_config: dict[str, Any] = {"backend": "transactional_event_queue", "aq_queue": _QUEUE_NAME, **events}
events_config: dict[str, Any] = {"backend": "txeventq", "aq_queue": _QUEUE_NAME, **events}
return OracleAsyncConfig(
connection_config={
"host": oracle_service.host,
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_txeventq_publish_receive(oracle_txeventq_config: OracleSyncConfig) -> N
channel = spec.event_channel(oracle_txeventq_config)

assert isinstance(channel, SyncEventChannel)
assert channel._backend_name == "transactional_event_queue" # pyright: ignore[reportPrivateUsage]
assert channel._backend_name == "txeventq" # pyright: ignore[reportPrivateUsage]

event_id = channel.publish("alerts", {"action": "refresh"})
message = next(channel.iter_events("alerts", poll_interval=1.0))
Expand Down Expand Up @@ -187,7 +187,7 @@ async def test_txeventq_async_publish_and_ack(oracle_txeventq_async_config: Orac
channel = spec.event_channel(oracle_txeventq_async_config)

assert isinstance(channel, AsyncEventChannel)
assert channel._backend_name == "transactional_event_queue" # pyright: ignore[reportPrivateUsage]
assert channel._backend_name == "txeventq" # pyright: ignore[reportPrivateUsage]

event_id = await channel.publish("alerts", {"action": "test"})
assert len(event_id) == 32
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/adapters/test_oracledb/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ def test_oracle_config_finite_options_use_literals_and_driver_enums() -> None:
"numpy",
}
assert set(get_args(_unwrap_not_required(driver_feature_hints["events_backend"]))) == {
"advanced_queue",
"aq",
"table_queue",
"transactional_event_queue",
"txeventq",
}


Expand Down
Loading
Loading