Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/rapidata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
RapidataJob,
RapidataJobDefinition,
RapidataJobManager,
RapidataSignal,
RapidataSignalManager,
SignalRun,
ValidationSetManager,
RapidataValidationSet,
Box,
Expand Down
1 change: 1 addition & 0 deletions src/rapidata/rapidata_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
from .order import RapidataOrderManager, RapidataOrder
from .job import RapidataJob, RapidataJobDefinition, RapidataJobManager
from .signal import RapidataSignal, RapidataSignalManager, SignalRun
from .validation import ValidationSetManager, RapidataValidationSet, Box
from .results import RapidataResults
from .selection import (
Expand Down
10 changes: 10 additions & 0 deletions src/rapidata/rapidata_client/rapidata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from rapidata.rapidata_client.datapoints._asset_uploader import AssetUploader
from rapidata.rapidata_client.job.rapidata_job_manager import RapidataJobManager
from rapidata.rapidata_client.flow.rapidata_flow_manager import RapidataFlowManager
from rapidata.rapidata_client.signal.rapidata_signal_manager import (
RapidataSignalManager,
)
from rapidata.rapidata_client.api.rapidata_api_client import (
optional_api_call,
mark_sdk_outdated,
Expand Down Expand Up @@ -88,6 +91,8 @@ def __init__(
audience (RapidataAudienceManager): The RapidataAudienceManager instance.
job (JobManager): The JobManager instance.
mri (RapidataBenchmarkManager): The RapidataBenchmarkManager instance.
signals (RapidataSignalManager): The RapidataSignalManager instance for managing
recurring audience-job schedules (signals) and observing their runs.
"""
tracer.set_session_id(
uuid.UUID(int=random.Random().getrandbits(128), version=4).hex
Expand Down Expand Up @@ -145,6 +150,11 @@ def __init__(
openapi_service=self._openapi_service
)

logger.debug("Initializing RapidataSignalManager")
self.signals = RapidataSignalManager(
openapi_service=self._openapi_service
)

logger.debug("Initializing RapidataDemographicManager")
self._demographic = DemographicManager(
openapi_service=self._openapi_service
Expand Down
3 changes: 3 additions & 0 deletions src/rapidata/rapidata_client/signal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .rapidata_signal import RapidataSignal
from .rapidata_signal_manager import RapidataSignalManager
from .signal_run import SignalRun
95 changes: 95 additions & 0 deletions src/rapidata/rapidata_client/signal/_signal_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any

from rapidata.rapidata_client.exceptions.rapidata_error import RapidataError

if TYPE_CHECKING:
from rapidata.service.openapi_service import OpenAPIService


_JSON_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}


def signal_request(
openapi_service: OpenAPIService,
method: str,
path: str,
body: Any = None,
query: dict[str, Any] | None = None,
) -> Any:
"""Call the Signal Service via the platform gateway and return the parsed JSON body.

The OpenAPI client isn't generated for signals yet, so this routes raw HTTP through
the same authenticated OAuth client every other manager uses (it injects the bearer
token transparently on each request). Once the OpenAPI spec is regenerated this
helper goes away in favour of the typed client.

Args:
openapi_service: The OpenAPIService whose api_client we borrow.
method: HTTP method (uppercase).
path: Path under ``/signal``, e.g. ``"signals"`` or ``f"signals/{id}/runs"``.
A leading slash is accepted and stripped.
body: Optional JSON-serialisable request body.
query: Optional querystring parameters.

Returns:
The parsed JSON response body, or ``None`` for empty responses (e.g. 204).

Raises:
RapidataError: For any 4xx/5xx response.
"""
base = f"https://api.{openapi_service.environment}/signal"
url = f"{base}/{path.lstrip('/')}"

if query:
# Pass scalars as-is and join lists with commas; the signal service follows the
# same convention as the existing openapi-generated clients.
params: list[tuple[str, str]] = []
for key, value in query.items():
if value is None:
continue
if isinstance(value, bool):
params.append((key, "true" if value else "false"))
elif isinstance(value, (list, tuple)):
for item in value:
params.append((key, str(item)))
else:
params.append((key, str(value)))
if params:
from urllib.parse import urlencode

url = f"{url}?{urlencode(params)}"

serialized_body = json.dumps(body) if body is not None else None

response = openapi_service.api_client.call_api(
method.upper(),
url,
header_params=dict(_JSON_HEADERS),
body=serialized_body,
)

status = getattr(response, "status", 0)
raw = response.read()
text = raw.decode("utf-8") if raw else ""

if status < 200 or status >= 300:
details: Any = None
message = text or f"HTTP {status}"
try:
details = json.loads(text) if text else None
if isinstance(details, dict):
message = details.get("message") or details.get("title") or message
except json.JSONDecodeError:
pass
raise RapidataError(
status_code=status,
message=message,
details=details,
)

if not text:
return None
return json.loads(text)
Loading
Loading