From 891f78c0ae714519a810b7b3402e9c1748df6e72 Mon Sep 17 00:00:00 2001 From: RapidPoseidon Date: Thu, 21 May 2026 10:27:02 +0000 Subject: [PATCH] feat(signal): add SignalManager to Python SDK Adds SDK support for the new Signal Service: - RapidataSignalManager exposed as `client.signals` for CRUD on signals. - RapidataSignal with lifecycle methods (pause/resume/delete/trigger/update), run listing, and a blocking wait_for_next_run helper. - SignalRun dataclass with terminal-status / success convenience properties. Backend service is shipping in parallel, so HTTP goes through the existing OAuth-authenticated api_client.call_api against /signal on the gateway. To upgrade after the OpenAPI regen: drop _signal_http and switch to the typed client. Co-Authored-By: luca --- src/rapidata/__init__.py | 3 + src/rapidata/rapidata_client/__init__.py | 1 + .../rapidata_client/rapidata_client.py | 10 + .../rapidata_client/signal/__init__.py | 3 + .../rapidata_client/signal/_signal_http.py | 95 ++++++ .../rapidata_client/signal/rapidata_signal.py | 318 ++++++++++++++++++ .../signal/rapidata_signal_manager.py | 147 ++++++++ .../rapidata_client/signal/signal_run.py | 86 +++++ 8 files changed, 663 insertions(+) create mode 100644 src/rapidata/rapidata_client/signal/__init__.py create mode 100644 src/rapidata/rapidata_client/signal/_signal_http.py create mode 100644 src/rapidata/rapidata_client/signal/rapidata_signal.py create mode 100644 src/rapidata/rapidata_client/signal/rapidata_signal_manager.py create mode 100644 src/rapidata/rapidata_client/signal/signal_run.py diff --git a/src/rapidata/__init__.py b/src/rapidata/__init__.py index 8a82e594a..88ab9c4ae 100644 --- a/src/rapidata/__init__.py +++ b/src/rapidata/__init__.py @@ -11,6 +11,9 @@ RapidataJob, RapidataJobDefinition, RapidataJobManager, + RapidataSignal, + RapidataSignalManager, + SignalRun, ValidationSetManager, RapidataValidationSet, Box, diff --git a/src/rapidata/rapidata_client/__init__.py b/src/rapidata/rapidata_client/__init__.py index 841392a6f..71b7166c2 100644 --- a/src/rapidata/rapidata_client/__init__.py +++ b/src/rapidata/rapidata_client/__init__.py @@ -7,6 +7,7 @@ ) from .order import RapidataOrderManager, RapidataOrder from .job import RapidataJob, RapidataJobDefinition, RapidataJobManager +from .signal import RapidataSignal, RapidataSignalManager, SignalRun from .validation import ValidationSetManager, RapidataValidationSet, Box from .results import RapidataResults from .selection import ( diff --git a/src/rapidata/rapidata_client/rapidata_client.py b/src/rapidata/rapidata_client/rapidata_client.py index f7e3e535f..c0e99ec3b 100644 --- a/src/rapidata/rapidata_client/rapidata_client.py +++ b/src/rapidata/rapidata_client/rapidata_client.py @@ -31,6 +31,9 @@ from rapidata.rapidata_client.datapoints._asset_uploader import AssetUploader from rapidata.rapidata_client.job.rapidata_job_manager import RapidataJobManager from rapidata.rapidata_client.flow.rapidata_flow_manager import RapidataFlowManager +from rapidata.rapidata_client.signal.rapidata_signal_manager import ( + RapidataSignalManager, +) from rapidata.rapidata_client.api.rapidata_api_client import ( optional_api_call, mark_sdk_outdated, @@ -88,6 +91,8 @@ def __init__( audience (RapidataAudienceManager): The RapidataAudienceManager instance. job (JobManager): The JobManager instance. mri (RapidataBenchmarkManager): The RapidataBenchmarkManager instance. + signals (RapidataSignalManager): The RapidataSignalManager instance for managing + recurring audience-job schedules (signals) and observing their runs. """ tracer.set_session_id( uuid.UUID(int=random.Random().getrandbits(128), version=4).hex @@ -145,6 +150,11 @@ def __init__( openapi_service=self._openapi_service ) + logger.debug("Initializing RapidataSignalManager") + self.signals = RapidataSignalManager( + openapi_service=self._openapi_service + ) + logger.debug("Initializing RapidataDemographicManager") self._demographic = DemographicManager( openapi_service=self._openapi_service diff --git a/src/rapidata/rapidata_client/signal/__init__.py b/src/rapidata/rapidata_client/signal/__init__.py new file mode 100644 index 000000000..4680b31e9 --- /dev/null +++ b/src/rapidata/rapidata_client/signal/__init__.py @@ -0,0 +1,3 @@ +from .rapidata_signal import RapidataSignal +from .rapidata_signal_manager import RapidataSignalManager +from .signal_run import SignalRun diff --git a/src/rapidata/rapidata_client/signal/_signal_http.py b/src/rapidata/rapidata_client/signal/_signal_http.py new file mode 100644 index 000000000..4b0f6d070 --- /dev/null +++ b/src/rapidata/rapidata_client/signal/_signal_http.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +from rapidata.rapidata_client.exceptions.rapidata_error import RapidataError + +if TYPE_CHECKING: + from rapidata.service.openapi_service import OpenAPIService + + +_JSON_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} + + +def signal_request( + openapi_service: OpenAPIService, + method: str, + path: str, + body: Any = None, + query: dict[str, Any] | None = None, +) -> Any: + """Call the Signal Service via the platform gateway and return the parsed JSON body. + + The OpenAPI client isn't generated for signals yet, so this routes raw HTTP through + the same authenticated OAuth client every other manager uses (it injects the bearer + token transparently on each request). Once the OpenAPI spec is regenerated this + helper goes away in favour of the typed client. + + Args: + openapi_service: The OpenAPIService whose api_client we borrow. + method: HTTP method (uppercase). + path: Path under ``/signal``, e.g. ``"signals"`` or ``f"signals/{id}/runs"``. + A leading slash is accepted and stripped. + body: Optional JSON-serialisable request body. + query: Optional querystring parameters. + + Returns: + The parsed JSON response body, or ``None`` for empty responses (e.g. 204). + + Raises: + RapidataError: For any 4xx/5xx response. + """ + base = f"https://api.{openapi_service.environment}/signal" + url = f"{base}/{path.lstrip('/')}" + + if query: + # Pass scalars as-is and join lists with commas; the signal service follows the + # same convention as the existing openapi-generated clients. + params: list[tuple[str, str]] = [] + for key, value in query.items(): + if value is None: + continue + if isinstance(value, bool): + params.append((key, "true" if value else "false")) + elif isinstance(value, (list, tuple)): + for item in value: + params.append((key, str(item))) + else: + params.append((key, str(value))) + if params: + from urllib.parse import urlencode + + url = f"{url}?{urlencode(params)}" + + serialized_body = json.dumps(body) if body is not None else None + + response = openapi_service.api_client.call_api( + method.upper(), + url, + header_params=dict(_JSON_HEADERS), + body=serialized_body, + ) + + status = getattr(response, "status", 0) + raw = response.read() + text = raw.decode("utf-8") if raw else "" + + if status < 200 or status >= 300: + details: Any = None + message = text or f"HTTP {status}" + try: + details = json.loads(text) if text else None + if isinstance(details, dict): + message = details.get("message") or details.get("title") or message + except json.JSONDecodeError: + pass + raise RapidataError( + status_code=status, + message=message, + details=details, + ) + + if not text: + return None + return json.loads(text) diff --git a/src/rapidata/rapidata_client/signal/rapidata_signal.py b/src/rapidata/rapidata_client/signal/rapidata_signal.py new file mode 100644 index 000000000..84faa847e --- /dev/null +++ b/src/rapidata/rapidata_client/signal/rapidata_signal.py @@ -0,0 +1,318 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from time import monotonic, sleep +from typing import TYPE_CHECKING, Any + +from rapidata.rapidata_client.config import logger, managed_print, tracer +from rapidata.rapidata_client.signal._signal_http import signal_request +from rapidata.rapidata_client.signal.signal_run import ( + SignalRun, + _parse_datetime, + _parse_optional_datetime, +) + +if TYPE_CHECKING: + from rapidata.service.openapi_service import OpenAPIService + + +class RapidataSignal: + """An instance of a Rapidata signal. + + A signal runs an audience job creation on a recurring schedule. Each run produces a + :class:`SignalRun` that wraps the audience job that was created (or an explanation + of why the run was skipped or failed). Use the manager (:py:attr:`RapidataClient.signals`) + to create or look up signals; use the instance methods on this class to manage an + existing signal's lifecycle and observe its runs. + """ + + def __init__( + self, + openapi_service: OpenAPIService, + data: dict[str, Any], + ): + self._openapi_service = openapi_service + self._data: dict[str, Any] = {} + self._apply(data) + logger.debug("RapidataSignal initialized: %s", self.id) + + def _apply(self, data: dict[str, Any]) -> None: + """Replace this instance's cached state with the latest API payload.""" + self._data = data + self._id: str = data["id"] + self._name: str = data["name"] + self._description: str | None = data.get("description") + self._audience_id: str = data["audienceId"] + self._job_definition_id: str = data["jobDefinitionId"] + self._revision_number: int | None = data.get("revisionNumber") + self._interval_seconds: int = data["intervalSeconds"] + self._next_run_at: datetime = _parse_datetime(data["nextRunAt"]) + self._last_run_at: datetime | None = _parse_optional_datetime( + data.get("lastRunAt") + ) + self._is_paused: bool = bool(data.get("isPaused", False)) + self._is_public: bool = bool(data.get("isPublic", False)) + self._created_at: datetime = _parse_datetime(data["createdAt"]) + + @property + def id(self) -> str: + return self._id + + @property + def name(self) -> str: + return self._name + + @property + def description(self) -> str | None: + return self._description + + @property + def audience_id(self) -> str: + return self._audience_id + + @property + def job_definition_id(self) -> str: + return self._job_definition_id + + @property + def revision_number(self) -> int | None: + return self._revision_number + + @property + def interval_seconds(self) -> int: + return self._interval_seconds + + @property + def next_run_at(self) -> datetime: + return self._next_run_at + + @property + def last_run_at(self) -> datetime | None: + return self._last_run_at + + @property + def is_paused(self) -> bool: + return self._is_paused + + @property + def is_public(self) -> bool: + return self._is_public + + @property + def created_at(self) -> datetime: + return self._created_at + + def refresh(self) -> RapidataSignal: + """Re-fetch this signal from the server and update local state. + + Returns: + RapidataSignal: ``self`` for chaining. + """ + with tracer.start_as_current_span("RapidataSignal.refresh"): + data = signal_request( + self._openapi_service, "GET", f"signals/{self.id}" + ) + self._apply(data) + return self + + def pause(self) -> RapidataSignal: + """Pause the signal. Scheduled runs stop until :py:meth:`resume` is called. + + Returns: + RapidataSignal: ``self`` for chaining. + """ + with tracer.start_as_current_span("RapidataSignal.pause"): + logger.info("Pausing signal '%s'", self.id) + data = signal_request( + self._openapi_service, "POST", f"signals/{self.id}/pause" + ) + if data is not None: + self._apply(data) + else: + self.refresh() + managed_print(f"Signal '{self}' has been paused.") + return self + + def resume(self) -> RapidataSignal: + """Resume a paused signal. Scheduled runs start firing again at the configured interval. + + Returns: + RapidataSignal: ``self`` for chaining. + """ + with tracer.start_as_current_span("RapidataSignal.resume"): + logger.info("Resuming signal '%s'", self.id) + data = signal_request( + self._openapi_service, "POST", f"signals/{self.id}/resume" + ) + if data is not None: + self._apply(data) + else: + self.refresh() + managed_print(f"Signal '{self}' has been resumed.") + return self + + def delete(self) -> None: + """Delete the signal. Existing runs and their audience jobs are unaffected.""" + with tracer.start_as_current_span("RapidataSignal.delete"): + logger.info("Deleting signal '%s'", self.id) + signal_request( + self._openapi_service, "DELETE", f"signals/{self.id}" + ) + managed_print(f"Signal '{self}' has been deleted.") + + def trigger(self) -> SignalRun: + """Trigger the signal manually, creating one extra run outside the schedule. + + Returns: + SignalRun: The freshly created run (in its initial status, typically + ``Pending``). + """ + with tracer.start_as_current_span("RapidataSignal.trigger"): + logger.info("Triggering signal '%s'", self.id) + data = signal_request( + self._openapi_service, "POST", f"signals/{self.id}/trigger" + ) + return SignalRun._from_api(data) + + def update( + self, + *, + name: str | None = None, + description: str | None = None, + interval_seconds: int | None = None, + ) -> RapidataSignal: + """Update mutable fields on the signal. + + Args: + name: New display name. Omit to leave unchanged. + description: New description. Omit to leave unchanged. + interval_seconds: New scheduling interval. Omit to leave unchanged. + + Returns: + RapidataSignal: ``self``, refreshed with the server's response. + """ + with tracer.start_as_current_span("RapidataSignal.update"): + body: dict[str, Any] = {} + if name is not None: + body["name"] = name + if description is not None: + body["description"] = description + if interval_seconds is not None: + body["intervalSeconds"] = interval_seconds + + if not body: + # Nothing requested — avoid a useless round trip. + return self + + data = signal_request( + self._openapi_service, "PATCH", f"signals/{self.id}", body=body + ) + if data is not None: + self._apply(data) + else: + self.refresh() + return self + + def get_runs( + self, + page: int = 1, + page_size: int = 20, + sort_descending: bool = True, + ) -> list[SignalRun]: + """List historical runs of this signal. + + Args: + page: 1-indexed page number. + page_size: Number of runs per page. + sort_descending: When ``True`` (default), newest runs come first. + + Returns: + list[SignalRun]: The runs on the requested page. + """ + with tracer.start_as_current_span("RapidataSignal.get_runs"): + data = signal_request( + self._openapi_service, + "GET", + f"signals/{self.id}/runs", + query={ + "page": page, + "pageSize": page_size, + "sortDescending": sort_descending, + }, + ) + items = data.get("items", []) if isinstance(data, dict) else [] + return [SignalRun._from_api(item) for item in items] + + def get_run(self, run_id: str) -> SignalRun: + """Get a specific run by ID. + + Args: + run_id: The ID of the run to fetch. + + Returns: + SignalRun: The requested run. + """ + with tracer.start_as_current_span("RapidataSignal.get_run"): + data = signal_request( + self._openapi_service, "GET", f"signals/{self.id}/runs/{run_id}" + ) + return SignalRun._from_api(data) + + def wait_for_next_run( + self, + timeout: float = 300, + poll_interval: float = 5.0, + ) -> SignalRun: + """Block until the next run of this signal reaches a terminal status. + + Polls the signal's runs and waits for any run that started *after* this call was + made to reach a terminal status (``Completed``, ``Failed``, or ``Skipped``). + Useful after :py:meth:`trigger` or when watching a scheduled run come through. + + Args: + timeout: Maximum seconds to wait. Raises :py:class:`TimeoutError` on expiry. + poll_interval: Seconds between polls. + + Returns: + SignalRun: The first new run to reach a terminal status. + + Raises: + TimeoutError: If no new terminal run is observed within ``timeout`` seconds. + """ + if timeout <= 0: + raise ValueError("timeout must be positive") + if poll_interval <= 0: + raise ValueError("poll_interval must be positive") + + with tracer.start_as_current_span("RapidataSignal.wait_for_next_run"): + started_waiting = datetime.now(timezone.utc) + deadline = monotonic() + timeout + + logger.info( + "Waiting up to %.0fs for the next terminal run of signal '%s'", + timeout, + self.id, + ) + while True: + # Pull the most recent runs and look for any whose started_at is after we + # began waiting AND that has reached a terminal status. We re-look every + # poll because a Pending run may transition while we wait. + runs = self.get_runs(page=1, page_size=20, sort_descending=True) + candidates = [r for r in runs if r.started_at >= started_waiting] + terminal = [r for r in candidates if r.is_terminal] + if terminal: + # `runs` came back descending, so the last terminal in the list is + # the earliest new terminal — return that one (the next to finish). + return terminal[-1] + + if monotonic() >= deadline: + raise TimeoutError( + f"No new terminal run of signal '{self.id}' within {timeout} seconds." + ) + sleep(poll_interval) + + def __str__(self) -> str: + return f"RapidataSignal(name='{self.name}', id='{self.id}')" + + def __repr__(self) -> str: + return self.__str__() diff --git a/src/rapidata/rapidata_client/signal/rapidata_signal_manager.py b/src/rapidata/rapidata_client/signal/rapidata_signal_manager.py new file mode 100644 index 000000000..4ae1a7b23 --- /dev/null +++ b/src/rapidata/rapidata_client/signal/rapidata_signal_manager.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from rapidata.rapidata_client.config import logger, tracer +from rapidata.rapidata_client.signal._signal_http import signal_request +from rapidata.rapidata_client.signal.rapidata_signal import RapidataSignal +from rapidata.rapidata_client.signal.signal_run import SignalRun + +if TYPE_CHECKING: + from rapidata.service.openapi_service import OpenAPIService + + +class RapidataSignalManager: + """Manage signals: schedules that periodically create audience jobs. + + A signal binds an audience to a job definition and an interval. The signal service + creates one audience job per interval tick (a :class:`SignalRun`). Use this manager + to create, look up, list, and delete signals; use methods on the returned + :class:`RapidataSignal` to control a specific signal and observe its runs. + + Access this manager via :py:attr:`RapidataClient.signals`. + """ + + def __init__(self, openapi_service: OpenAPIService): + self.__openapi_service = openapi_service + logger.debug("RapidataSignalManager initialized") + + def create( + self, + name: str, + audience_id: str, + job_definition_id: str, + interval_seconds: int, + description: str | None = None, + revision_number: int | None = None, + is_public: bool = False, + ) -> RapidataSignal: + """Create a new signal. + + Args: + name: Display name for the signal. + audience_id: ID of the audience the signal targets each run. + job_definition_id: ID of the job definition that backs each run. + interval_seconds: How often the signal fires, in seconds. + description: Optional human-readable description. + revision_number: Optional explicit revision of ``job_definition_id`` to pin + this signal to. If omitted, the signal follows the latest revision. + is_public: Whether other users can discover and read this signal. + + Returns: + RapidataSignal: The created signal. + """ + if interval_seconds <= 0: + raise ValueError("interval_seconds must be positive") + + with tracer.start_as_current_span("RapidataSignalManager.create"): + body: dict[str, Any] = { + "name": name, + "audienceId": audience_id, + "jobDefinitionId": job_definition_id, + "intervalSeconds": interval_seconds, + "isPublic": is_public, + } + if description is not None: + body["description"] = description + if revision_number is not None: + body["revisionNumber"] = revision_number + + logger.debug( + "Creating signal '%s' (audience=%s, job_definition=%s, interval=%ds)", + name, + audience_id, + job_definition_id, + interval_seconds, + ) + data = signal_request(self.__openapi_service, "POST", "signals", body=body) + return RapidataSignal(self.__openapi_service, data) + + def get(self, signal_id: str) -> RapidataSignal: + """Fetch a signal by ID. + + Args: + signal_id: The signal to fetch. + + Returns: + RapidataSignal: The signal. + """ + with tracer.start_as_current_span("RapidataSignalManager.get"): + data = signal_request( + self.__openapi_service, "GET", f"signals/{signal_id}" + ) + return RapidataSignal(self.__openapi_service, data) + + def list( + self, + owned_by_me: bool = True, + include_public: bool = False, + page: int = 1, + page_size: int = 50, + ) -> list[RapidataSignal]: + """List signals visible to the caller. + + Args: + owned_by_me: When ``True`` (default), only return signals the caller owns. + include_public: When ``True``, also include signals other users have marked public. + page: 1-indexed page number. + page_size: Number of signals per page. + + Returns: + list[RapidataSignal]: The signals on the requested page. + """ + with tracer.start_as_current_span("RapidataSignalManager.list"): + data = signal_request( + self.__openapi_service, + "GET", + "signals", + query={ + "ownedByMe": owned_by_me, + "includePublic": include_public, + "page": page, + "pageSize": page_size, + }, + ) + items = data.get("items", []) if isinstance(data, dict) else [] + return [RapidataSignal(self.__openapi_service, item) for item in items] + + def get_run(self, run_id: str) -> SignalRun: + """Fetch a single signal run by ID, when the signal it belongs to is unknown. + + Args: + run_id: The run ID. + + Returns: + SignalRun: The run. + """ + with tracer.start_as_current_span("RapidataSignalManager.get_run"): + data = signal_request( + self.__openapi_service, "GET", f"runs/{run_id}" + ) + return SignalRun._from_api(data) + + def __str__(self) -> str: + return "RapidataSignalManager" + + def __repr__(self) -> str: + return self.__str__() diff --git a/src/rapidata/rapidata_client/signal/signal_run.py b/src/rapidata/rapidata_client/signal/signal_run.py new file mode 100644 index 000000000..5c5fe047b --- /dev/null +++ b/src/rapidata/rapidata_client/signal/signal_run.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Literal + +SignalRunStatus = Literal["Pending", "Running", "Completed", "Failed", "Skipped"] +SignalRunTriggerSource = Literal["Scheduled", "Manual"] +_TERMINAL_STATUSES: frozenset[str] = frozenset({"Completed", "Failed", "Skipped"}) + + +def _parse_datetime(value: Any) -> datetime: + """Parse an ISO-8601 datetime string returned by the signal API.""" + if isinstance(value, datetime): + return value + if not isinstance(value, str): + raise TypeError( + f"Expected datetime or ISO-8601 string, got {type(value).__name__}: {value!r}" + ) + # `Z` suffix is valid ISO-8601 but Python's fromisoformat only accepts it on 3.11+; + # normalise to "+00:00" so this works on the SDK's minimum 3.10 target too. + if value.endswith("Z"): + value = value[:-1] + "+00:00" + return datetime.fromisoformat(value) + + +def _parse_optional_datetime(value: Any) -> datetime | None: + if value is None: + return None + return _parse_datetime(value) + + +@dataclass(frozen=True) +class SignalRun: + """A single execution of a Signal — the result of one audience job creation. + + A run is created either by the signal's scheduled interval or by an explicit + :py:meth:`RapidataSignal.trigger` call. It transitions through statuses until + it reaches a terminal one (``Completed``, ``Failed`` or ``Skipped``). + """ + + id: str + signal_id: str + owner_id: str + owner_mail: str + trigger_source: SignalRunTriggerSource + started_at: datetime + status: SignalRunStatus + audience_job_id: str | None + completed_at: datetime | None + result_file_name: str | None + failure_message: str | None + skipped_reason: str | None + created_at: datetime + + @property + def is_terminal(self) -> bool: + """True when the run has finished (``Completed``, ``Failed`` or ``Skipped``).""" + return self.status in _TERMINAL_STATUSES + + @property + def succeeded(self) -> bool: + """True when the run finished successfully (``Completed``).""" + return self.status == "Completed" + + @classmethod + def _from_api(cls, data: dict[str, Any]) -> SignalRun: + """Build a SignalRun from the JSON body returned by the signal service.""" + return cls( + id=data["id"], + signal_id=data["signalId"], + owner_id=data["ownerId"], + owner_mail=data["ownerMail"], + trigger_source=data["triggerSource"], + started_at=_parse_datetime(data["startedAt"]), + status=data["status"], + audience_job_id=data.get("audienceJobId"), + completed_at=_parse_optional_datetime(data.get("completedAt")), + result_file_name=data.get("resultFileName"), + failure_message=data.get("failureMessage"), + skipped_reason=data.get("skippedReason"), + created_at=_parse_datetime(data["createdAt"]), + ) + + def __str__(self) -> str: + return f"SignalRun(id='{self.id}', status='{self.status}')"