Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 86 additions & 4 deletions src/schematic/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import datetime as dt
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Union
Expand Down Expand Up @@ -40,6 +41,61 @@ class CheckFlagOptions:
timeout: Optional[float] = None


@dataclass
class TrackOptions:
"""Optional metadata for a track event.

Fields map directly to the corresponding ``CreateEventRequestBody``
properties. Omit any field you don't need; the SDK only sends fields
that are explicitly set.
"""

# Client-supplied dedupe key. Duplicate events with the same key
# (scoped to the environment) are dropped server-side for 24 hours.
idempotency_key: Optional[str] = None
# Timestamp the event was sent. Required when trusted_client_clock=True.
sent_at: Optional[dt.datetime] = None
# When True, use sent_at as the effective event timestamp instead of
# server receipt time. Requires a secret API key and sent_at.
trusted_client_clock: Optional[bool] = None
# Import historical data without affecting billing. Requires a secret
# API key and trusted_client_clock.
backfill: Optional[bool] = None


@dataclass
class IdentifyOptions:
"""Optional metadata for an identify event.

Fields map directly to the corresponding ``CreateEventRequestBody``
properties. Omit any field you don't need; the SDK only sends fields
that are explicitly set.
"""

# Client-supplied dedupe key. Duplicate events with the same key
# (scoped to the environment) are dropped server-side for 24 hours.
idempotency_key: Optional[str] = None


def _event_options_to_kwargs(
options: Optional[Union[TrackOptions, IdentifyOptions]],
) -> Dict[str, Any]:
"""Flatten an options dataclass into kwargs for CreateEventRequestBody.

Only fields that were explicitly set on the dataclass are returned, so
unset fields don't override CreateEventRequestBody's own defaults and
don't appear on the wire as explicit nulls.
"""
if options is None:
return {}
kwargs: Dict[str, Any] = {}
for field in ("idempotency_key", "sent_at", "trusted_client_clock", "backfill"):
value = getattr(options, field, None)
if value is not None:
kwargs[field] = value
return kwargs


@dataclass
class DataStreamConfig:
"""Configuration for DataStream real-time flag evaluation."""
Expand Down Expand Up @@ -274,6 +330,7 @@ def identify(
company: Optional[EventBodyIdentifyCompany] = None,
name: Optional[str] = None,
traits: Optional[Dict[str, Any]] = None,
options: Optional[IdentifyOptions] = None,
) -> None:
self._enqueue_event(
"identify",
Expand All @@ -283,6 +340,7 @@ def identify(
name=name,
traits=traits,
),
options=options,
)

def track(
Expand All @@ -292,6 +350,7 @@ def track(
user: Optional[Dict[str, str]] = None,
traits: Optional[Dict[str, Any]] = None,
quantity: Optional[int] = None,
options: Optional[TrackOptions] = None,
) -> None:
self._enqueue_event(
"track",
Expand All @@ -302,13 +361,23 @@ def track(
traits=traits,
user=user,
),
options=options,
)

def _enqueue_event(self, event_type: str, body: EventBody) -> None:
def _enqueue_event(
self,
event_type: str,
body: EventBody,
options: Optional[Union[TrackOptions, IdentifyOptions]] = None,
) -> None:
if self.offline:
return
try:
event_body = CreateEventRequestBody(event_type=event_type, body=body)
event_body = CreateEventRequestBody(
event_type=event_type,
body=body,
**_event_options_to_kwargs(options),
)
self.event_buffer.push(event_body)
except Exception as e:
self.logger.error(e)
Expand Down Expand Up @@ -700,6 +769,7 @@ async def identify(
company: Optional[EventBodyIdentifyCompany] = None,
name: Optional[str] = None,
traits: Optional[Dict[str, Any]] = None,
options: Optional[IdentifyOptions] = None,
) -> None:
await self._enqueue_event(
"identify",
Expand All @@ -709,6 +779,7 @@ async def identify(
name=name,
traits=traits,
),
options=options,
)

async def track(
Expand All @@ -718,6 +789,7 @@ async def track(
user: Optional[Dict[str, str]] = None,
traits: Optional[Dict[str, Any]] = None,
quantity: Optional[int] = None,
options: Optional[TrackOptions] = None,
) -> None:
await self._enqueue_event(
"track",
Expand All @@ -728,6 +800,7 @@ async def track(
traits=traits,
user=user,
),
options=options,
)

# Update company metrics in DataStream if available and connected
Expand All @@ -742,11 +815,20 @@ async def track(
except Exception as e:
self.logger.error(f"Failed to update company metrics: {e}")

async def _enqueue_event(self, event_type: str, body: EventBody) -> None:
async def _enqueue_event(
self,
event_type: str,
body: EventBody,
options: Optional[Union[TrackOptions, IdentifyOptions]] = None,
) -> None:
if self.offline:
return
try:
event_body = CreateEventRequestBody(event_type=event_type, body=body)
event_body = CreateEventRequestBody(
event_type=event_type,
body=body,
**_event_options_to_kwargs(options),
)
await self.event_buffer.push(event_body)
except Exception as e:
self.logger.error(e)
Expand Down
32 changes: 25 additions & 7 deletions src/schematic/event_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,44 @@ class _CaptureEventPayload(UniversalBaseModel):
"""Wire format for a single event sent to the capture service.

Mirrors the shape used by the Go/Ruby/C# SDKs: `type` (not `event_type`)
and an `api_key` field embedded on each event.
and an `api_key` field embedded on each event. The optional metadata
fields (idempotency_key, sent_at, trusted_client_clock, backfill) map
directly to the equivalent fields on ``CreateEventRequestBody``.
"""

api_key: str = pydantic.Field()
body: typing.Optional[EventBody] = None
type: EventType = pydantic.Field()
idempotency_key: typing.Optional[str] = None
sent_at: typing.Optional[dt.datetime] = None
trusted_client_clock: typing.Optional[bool] = None
backfill: typing.Optional[bool] = None


class _CaptureBatchPayload(UniversalBaseModel):
events: typing.List[_CaptureEventPayload]


def _to_payload(event: CreateEventRequestBody, api_key: str) -> _CaptureEventPayload:
return _CaptureEventPayload(
api_key=api_key,
body=event.body,
type=event.event_type,
sent_at=event.sent_at,
)
# Build kwargs conditionally so unset optional fields stay unset on the
# model. The capture wire format uses `exclude_unset`-style semantics —
# we don't want to send `"idempotency_key": null` for events that didn't
# set one.
kwargs: typing.Dict[str, typing.Any] = {
"api_key": api_key,
"type": event.event_type,
}
if event.body is not None:
kwargs["body"] = event.body
if event.idempotency_key is not None:
kwargs["idempotency_key"] = event.idempotency_key
if event.sent_at is not None:
kwargs["sent_at"] = event.sent_at
if event.trusted_client_clock is not None:
kwargs["trusted_client_clock"] = event.trusted_client_clock
if event.backfill is not None:
kwargs["backfill"] = event.backfill
return _CaptureEventPayload(**kwargs)


def _build_endpoint(base_url: str) -> str:
Expand Down
117 changes: 117 additions & 0 deletions tests/custom/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
AsyncSchematic,
AsyncSchematicConfig,
CheckFlagOptions,
IdentifyOptions,
Schematic,
SchematicConfig,
TrackOptions,
)
from schematic.types import CheckFlagResponseData, FeatureEntitlement

Expand Down Expand Up @@ -159,6 +161,87 @@ def test_track_with_quantity(self):
)
mock_push.assert_called_once()

def test_track_with_idempotency_key(self):
"""idempotency_key set via TrackOptions must land on the
CreateEventRequestBody pushed to the event buffer so the server can
dedupe on it."""
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.track(
event="credit-consumed",
company={"id": "company_id"},
options=TrackOptions(idempotency_key="evt_abc123"),
)
mock_push.assert_called_once()
pushed = mock_push.call_args.args[0]
self.assertEqual(pushed.idempotency_key, "evt_abc123")

def test_track_without_options_leaves_optional_fields_none(self):
"""Options are opt-in — omitting `options` must leave every optional
metadata field at None on the wire."""
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.track(
event="some-event",
company={"id": "company_id"},
)
pushed = mock_push.call_args.args[0]
self.assertIsNone(pushed.idempotency_key)
self.assertIsNone(pushed.sent_at)
self.assertIsNone(pushed.trusted_client_clock)
self.assertIsNone(pushed.backfill)

def test_track_with_full_options(self):
"""Every TrackOptions field should land on the CreateEventRequestBody."""
import datetime as dt
sent_at = dt.datetime(2026, 5, 21, 12, 0, 0, tzinfo=dt.timezone.utc)
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.track(
event="historical-import",
company={"id": "company_id"},
options=TrackOptions(
idempotency_key="evt_xyz",
sent_at=sent_at,
trusted_client_clock=True,
backfill=True,
),
)
pushed = mock_push.call_args.args[0]
self.assertEqual(pushed.idempotency_key, "evt_xyz")
self.assertEqual(pushed.sent_at, sent_at)
self.assertTrue(pushed.trusted_client_clock)
self.assertTrue(pushed.backfill)

def test_track_partial_options(self):
"""Unset TrackOptions fields stay None on the CreateEventRequestBody —
we don't accidentally send explicit nulls for things the caller didn't ask for."""
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.track(
event="some-event",
company={"id": "company_id"},
options=TrackOptions(idempotency_key="just-the-key"),
)
pushed = mock_push.call_args.args[0]
self.assertEqual(pushed.idempotency_key, "just-the-key")
self.assertIsNone(pushed.sent_at)
self.assertIsNone(pushed.trusted_client_clock)
self.assertIsNone(pushed.backfill)

def test_identify_with_options(self):
"""IdentifyOptions must plumb through to the CreateEventRequestBody."""
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.identify(
keys={"id": "user_id"},
options=IdentifyOptions(idempotency_key="ident_123"),
)
pushed = mock_push.call_args.args[0]
self.assertEqual(pushed.idempotency_key, "ident_123")

def test_identify_without_options(self):
"""Existing identify callers without options keep working unchanged."""
with patch.object(self.schematic.event_buffer, "push") as mock_push:
self.schematic.identify(keys={"id": "user_id"}, name="User Name")
pushed = mock_push.call_args.args[0]
self.assertIsNone(pushed.idempotency_key)

def test_check_flag_with_no_cache(self):
"""Verify that when cache_providers is empty, every call hits the API."""
config = SchematicConfig(
Expand Down Expand Up @@ -766,6 +849,40 @@ async def test_track(self):
)
mock_push.assert_called_once()

async def test_track_with_options(self):
"""All TrackOptions fields must plumb through async track() to the
CreateEventRequestBody."""
import datetime as dt
sent_at = dt.datetime(2026, 5, 21, 12, 0, 0, tzinfo=dt.timezone.utc)
with patch.object(self.async_schematic.event_buffer, "push") as mock_push:
await self.async_schematic.track(
event="credit-consumed",
company={"id": "company_id"},
options=TrackOptions(
idempotency_key="evt_abc123",
sent_at=sent_at,
trusted_client_clock=True,
backfill=False,
),
)
mock_push.assert_called_once()
pushed = mock_push.call_args.args[0]
assert pushed.idempotency_key == "evt_abc123"
assert pushed.sent_at == sent_at
assert pushed.trusted_client_clock is True
# backfill=False is explicitly set; it should land on the body.
assert pushed.backfill is False

async def test_async_identify_with_options(self):
"""IdentifyOptions must plumb through async identify()."""
with patch.object(self.async_schematic.event_buffer, "push") as mock_push:
await self.async_schematic.identify(
keys={"id": "user_id"},
options=IdentifyOptions(idempotency_key="ident_async"),
)
pushed = mock_push.call_args.args[0]
assert pushed.idempotency_key == "ident_async"

async def test_check_flag_with_no_cache(self):
"""Verify that when cache_providers is empty, every call hits the API."""
config = AsyncSchematicConfig(
Expand Down
Loading
Loading