diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index 76f80015..de514d3f 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -35,7 +35,7 @@ GlobalDependencies, resolve_environment_variable_dependency, ) -from src.config.environment_variables import EnvVarKeys +from src.config.environment_variables import EnvironmentVariables, EnvVarKeys, Environment from src.domain.exceptions import GenericException from src.utils.logging import make_logger from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics @@ -195,6 +195,31 @@ async def handle_unexpected(request, exc): fastapi_app.include_router(checkpoints.router) fastapi_app.include_router(task_retention.router) +# Test-only seeding endpoint. Gated by env (must opt in AND be on a known +# non-prod environment) so this code path is unreachable in production +# deployments by construction -- the router is not even mounted. +# +# Allow-list rather than deny-list: ENVIRONMENT is typed `str | None` and +# populated raw from os.environ with no enum coercion, so a deny-list against +# Environment.PROD would fail OPEN on unset, "prod", "Production", typos, or +# any new environment name. Mount only when ENVIRONMENT is an explicitly +# known non-prod value. +_TEST_SEEDING_ALLOWED_ENVS = {Environment.DEV, Environment.STAGING} +_test_seeding_env_vars = EnvironmentVariables.refresh() +if ( + _test_seeding_env_vars is not None + and _test_seeding_env_vars.ENABLE_TEST_SEEDING + and _test_seeding_env_vars.ENVIRONMENT in _TEST_SEEDING_ALLOWED_ENVS +): + from src.api.routes import test_seeding + + fastapi_app.include_router(test_seeding.router) + logger.warning( + "Test seeding endpoint /test/seed is MOUNTED. " + "This must never happen in production.", + extra={"environment": _test_seeding_env_vars.ENVIRONMENT}, + ) + # Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses. # This must be the outermost layer to bypass all middleware. # Export as `app` so existing uvicorn entry points (app:app) work without changes. diff --git a/agentex/src/api/routes/test_seeding.py b/agentex/src/api/routes/test_seeding.py new file mode 100644 index 00000000..b6945d7a --- /dev/null +++ b/agentex/src/api/routes/test_seeding.py @@ -0,0 +1,181 @@ +"""Test-only seeding endpoint. + +POST /test/seed lets e2e tests insert resource rows directly, bypassing the ACP +runtime. The router is only mounted in app.py when both: + - env_vars.ENABLE_TEST_SEEDING is true + - env_vars.ENVIRONMENT is an explicitly-allowed non-prod value + (Environment.DEV or Environment.STAGING). Unknown / typo'd / unset + environments fail closed because ENVIRONMENT is typed `str | None` + with no enum coercion. + +Per-request the endpoint also requires the X-Test-Seed-Token header to match +env_vars.TEST_SEED_TOKEN (compared with hmac.compare_digest). + +All gate failures return 404 (not 401/403) to avoid advertising the route's +existence on misconfigured deployments. This file + the use case + the env +config + the mount check in app.py are the four removal points if test seeding +ever moves to a separate test-utilities image. +""" + +from __future__ import annotations + +import hmac +from typing import Annotated, Any, Literal +from uuid import UUID + +from fastapi import APIRouter, Depends, Header, HTTPException, Request, status +from pydantic import Field + +from src.api.schemas.events import Event +from src.config.dependencies import GlobalDependencies +from src.config.environment_variables import Environment, EnvironmentVariables +from src.domain.use_cases.test_seeding_use_case import DTestSeedingUseCase +from src.utils.logging import make_logger +from src.utils.model_utils import BaseModel + +# Allow-list of non-prod environment names for the seeding gate. Kept in sync +# with the mount-time check in src/api/app.py — any new non-prod environment +# must be added in both places. +_ALLOWED_ENVS: frozenset[str] = frozenset({Environment.DEV, Environment.STAGING}) + + +def get_seeding_env_vars() -> EnvironmentVariables: + """Named dependency callable for the seeding gate. + + Defined as a named function (not an inline lambda) so tests can override it + via ``fastapi_app.dependency_overrides[get_seeding_env_vars] = ...``. The + process-wide ``DEnvironmentVariables`` alias uses an inline lambda which + cannot be keyed in dependency_overrides. + """ + return GlobalDependencies().environment_variables + +logger = make_logger(__name__) + +router = APIRouter(prefix="/test", tags=["TestSeeding"]) + + +# -- request payload schemas --------------------------------------------------- + + +class _EventSeedPayload(BaseModel): + """Payload for seeding a single event row.""" + + task_id: UUID = Field(..., description="Parent task UUID. Must already exist.") + agent_id: UUID = Field(..., description="Parent agent UUID. Must already exist.") + content: dict[str, Any] | None = Field( + None, + description=( + "Optional event content. Will be wrapped in a DataContentEntity and " + "have audit-marker keys ('seeded', 'seeded_at') added before persist." + ), + ) + id: UUID | None = Field( + None, + description="Optional event UUID override. Auto-generated if omitted.", + ) + + +class SeedEventRequest(BaseModel): + """Discriminated request for seeding an event. + + To add a new seedable resource (task, api_key, schedule, ...), add a sibling + `SeedXxxRequest` class with `resource_type: Literal["xxx"]`, then add it to + the `SeedRequest` union below and dispatch in the route handler. + """ + + resource_type: Literal["event"] + payload: _EventSeedPayload + + +# When adding a second resource type, change this to: +# SeedRequest = Annotated[ +# SeedEventRequest | SeedTaskRequest | ..., +# Field(discriminator="resource_type"), +# ] +# For now there is a single variant; we keep the discriminated shape so the +# eventual extension is mechanical. +SeedRequest = SeedEventRequest + + +# -- gate ---------------------------------------------------------------------- + + +def _require_test_seeding_enabled( + env_vars: Annotated[EnvironmentVariables, Depends(get_seeding_env_vars)], + x_test_seed_token: Annotated[str | None, Header(alias="X-Test-Seed-Token")] = None, +) -> None: + """Fail-closed gate for the seeding endpoint. + + All failure modes return 404 (not 401/403) so we don't advertise that the + route exists on misconfigured deployments. Token comparison is constant-time. + """ + not_found = HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Not Found" + ) + + # Hard env gate, regardless of flag. Allow-list rather than deny-list: + # ENVIRONMENT is raw os.environ with no enum coercion, so a deny-list + # against PROD would fail OPEN on unset / "prod" / "Production" / typos / + # new env names. Fail closed on anything we don't explicitly recognize as + # non-prod. + if env_vars.ENVIRONMENT not in _ALLOWED_ENVS: + raise not_found + + if not env_vars.ENABLE_TEST_SEEDING: + raise not_found + + expected = env_vars.TEST_SEED_TOKEN + if not expected: + # No token configured -> endpoint is unusable even with the flag on. + raise not_found + + if not x_test_seed_token: + raise not_found + + if not hmac.compare_digest(x_test_seed_token, expected): + raise not_found + + +# -- route --------------------------------------------------------------------- + + +@router.post( + "/seed", + response_model=Event, + status_code=status.HTTP_201_CREATED, + dependencies=[Depends(_require_test_seeding_enabled)], +) +async def seed_resource( + body: SeedRequest, + request: Request, + use_case: DTestSeedingUseCase, +) -> Event: + """Test-only direct insert. Returns the persisted resource entity. + + Extension point: when SeedRequest becomes a true union, dispatch on + body.resource_type here. Each branch should call into its matching + `use_case.seed_(...)` method. + """ + principal_id: str | None = None + principal_ctx = getattr(request.state, "principal_context", None) + if isinstance(principal_ctx, dict): + principal_id = principal_ctx.get("user_id") or principal_ctx.get( + "service_account_id" + ) + + if body.resource_type == "event": + payload = body.payload + event_entity = await use_case.seed_event( + task_id=str(payload.task_id), + agent_id=str(payload.agent_id), + content=payload.content, + id_override=str(payload.id) if payload.id is not None else None, + principal_id=principal_id, + ) + return Event.model_validate(event_entity) + + # Defensive: the discriminator should make this unreachable. + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Unsupported resource_type: {body.resource_type!r}", + ) diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 0872c0cf..c71fb6c7 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -65,6 +65,8 @@ class EnvVarKeys(str, Enum): RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE" RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT" RETENTION_CLEANUP_DRY_RUN = "RETENTION_CLEANUP_DRY_RUN" + ENABLE_TEST_SEEDING = "ENABLE_TEST_SEEDING" + TEST_SEED_TOKEN = "TEST_SEED_TOKEN" class Environment(str, Enum): @@ -128,6 +130,11 @@ class EnvironmentVariables(BaseModel): RETENTION_CLEANUP_PAGE_SIZE: int = 200 RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20 RETENTION_CLEANUP_DRY_RUN: bool = True + # Test-only seeding gate. Both must be set for the /test/seed endpoint to be + # mounted. Defaults are fail-closed: endpoint is unreachable in any env that + # doesn't explicitly opt in. Hard-gated off in production regardless of flag. + ENABLE_TEST_SEEDING: bool = False + TEST_SEED_TOKEN: str | None = None @classmethod def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: @@ -242,6 +249,10 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: RETENTION_CLEANUP_DRY_RUN=( os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true") == "true" ), + ENABLE_TEST_SEEDING=( + os.environ.get(EnvVarKeys.ENABLE_TEST_SEEDING, "false") == "true" + ), + TEST_SEED_TOKEN=os.environ.get(EnvVarKeys.TEST_SEED_TOKEN), ) refreshed_environment_variables = environment_variables return refreshed_environment_variables diff --git a/agentex/src/domain/use_cases/test_seeding_use_case.py b/agentex/src/domain/use_cases/test_seeding_use_case.py new file mode 100644 index 00000000..7edc8d70 --- /dev/null +++ b/agentex/src/domain/use_cases/test_seeding_use_case.py @@ -0,0 +1,124 @@ +"""Test-only seeding use case. + +Inserts resource rows directly into the repositories without going through the +ACP runtime. Mounted only when ENABLE_TEST_SEEDING is true AND +ENVIRONMENT != production. The endpoint that calls into this use case is gated +to the same conditions plus a shared-secret header. See +src/api/routes/test_seeding.py. + +This module is deliberately isolated so it can be deleted in one surgical +removal when/if test seeding moves into a separate test-utilities image. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Annotated, Any + +from fastapi import Depends + +from src.domain.entities.events import EventEntity +from src.domain.entities.task_messages import ( + DataContentEntity, + MessageAuthor, + TaskMessageContentEntity, + TaskMessageContentType, +) +from src.domain.repositories.event_repository import DEventRepository +from src.utils.ids import orm_id +from src.utils.logging import make_logger + +logger = make_logger(__name__) + + +class TestSeedingUseCase: # noqa: PT001 — not a pytest class; "Test" prefix is the use-case domain name + """Test-only resource seeding. + + Each `seed_` method writes a row directly via the matching + repository, mirroring the persistence half of the natural-flow write path + but skipping any downstream side effects (ACP forwards, etc.). + + NOTE on FGAC: events are not a first-class FGAC resource + (AgentexResourceType has only agent/task/api_key/schedule). Event authz + delegates to the parent agent. When seeding future FGAC-registered + resources (task, api_key, schedule), the corresponding seed_* method MUST + also call authorization_service.register_resource(...) before persisting, + mirroring the pattern in agent_api_keys_use_case._register_api_key_in_auth. + """ + + # Tell pytest not to collect this as a test class. Must come AFTER the + # docstring — Python only treats a string literal as __doc__ when it's + # the FIRST statement in the class body. + __test__ = False + + def __init__(self, event_repository: DEventRepository) -> None: + self.event_repository = event_repository + + async def seed_event( + self, + *, + task_id: str, + agent_id: str, + content: dict[str, Any] | None = None, + id_override: str | None = None, + principal_id: str | None = None, + ) -> EventEntity: + """Seed a single event row. + + Injects an audit marker `{"seeded": true, "seeded_at": }` into + the persisted content so downstream tests can filter for seeded rows. + """ + event_id = id_override or orm_id() + seeded_at = datetime.now(timezone.utc).isoformat() + + # Build the persisted content: start with any caller-supplied dict, then + # overlay the audit marker. If no content was supplied, persist just the + # marker as a DataContentEntity (events.content is nullable but we want + # the marker to always be present, and DATA is the only content type + # that accepts an arbitrary dict). + merged: dict[str, Any] = dict(content) if content else {} + merged["seeded"] = True + merged["seeded_at"] = seeded_at + + # Seeded events are synthetic; mark them as agent-authored. The author + # field is required by BaseTaskMessageContentEntity but has no + # semantic meaning for seeded rows -- the {"seeded": true} audit + # marker in `data` is the actual signal for downstream filtering. + content_entity: TaskMessageContentEntity = DataContentEntity( + type=TaskMessageContentType.DATA, + author=MessageAuthor.AGENT, + data=merged, + ) + + event = await self.event_repository.create( + id=event_id, + task_id=task_id, + agent_id=agent_id, + content=content_entity, + ) + + logger.info( + "test seeding wrote resource", + extra={ + "resource_type": "event", + "resource_id": event.id, + "principal_id": principal_id, + "task_id": task_id, + "agent_id": agent_id, + }, + ) + + # TODO when adding seed_task / seed_api_key / seed_schedule: + # FGAC-registered resources MUST also call + # authorization_service.register_resource( + # resource=AgentexResource.(), + # parent=AgentexResource.agent(), + # ) + # BEFORE persisting, mirroring agent_api_keys_use_case._register_api_key_in_auth. + # Events are exempt from this because event authz delegates to the parent + # agent (which must already exist & already be registered). + + return event + + +DTestSeedingUseCase = Annotated[TestSeedingUseCase, Depends(TestSeedingUseCase)] diff --git a/agentex/tests/integration/api/test_test_seeding_api.py b/agentex/tests/integration/api/test_test_seeding_api.py new file mode 100644 index 00000000..36710fdd --- /dev/null +++ b/agentex/tests/integration/api/test_test_seeding_api.py @@ -0,0 +1,346 @@ +"""Integration tests for the test-only /test/seed endpoint. + +Covers: + - Gate behavior (flag off, prod env, missing/wrong token) -> 404 + - Happy path: event row is persisted and returned + - Audit marker injected into content +""" + +from __future__ import annotations + +import pytest +import pytest_asyncio +from httpx import ASGITransport, AsyncClient +from src.api.app import fastapi_app +from src.api.routes import test_seeding as test_seeding_route +from src.api.routes.test_seeding import get_seeding_env_vars +from src.config.environment_variables import Environment +from src.domain.entities.agents import ACPType, AgentEntity +from src.domain.entities.tasks import TaskEntity, TaskStatus +from src.domain.use_cases.test_seeding_use_case import TestSeedingUseCase +from src.utils.ids import orm_id + +VALID_TOKEN = "test-seed-token-abc123" + + +class _FakeEnvVars: + """Minimal stand-in for EnvironmentVariables; only the attrs the gate reads.""" + + def __init__( + self, + *, + enabled: bool = True, + environment: str | None = Environment.DEV, + token: str | None = VALID_TOKEN, + ) -> None: + self.ENABLE_TEST_SEEDING = enabled + self.ENVIRONMENT = environment + self.TEST_SEED_TOKEN = token + + +@pytest_asyncio.fixture +async def seeded_agent_and_task(isolated_repositories): + """Create an agent + task so seeded events have valid FKs.""" + agent = await isolated_repositories["agent_repository"].create( + AgentEntity( + id=orm_id(), + name="seed-test-agent", + description="seed", + acp_url="http://acp:8000", + acp_type=ACPType.SYNC, + ) + ) + task = await isolated_repositories["task_repository"].create( + agent_id=agent.id, + task=TaskEntity( + id=orm_id(), + name="seed-test-task", + status=TaskStatus.RUNNING, + status_reason="seed", + ), + ) + return {"agent": agent, "task": task} + + +@pytest_asyncio.fixture +async def seeding_client(isolated_integration_app, isolated_repositories): + """Provide an httpx client with the seeding router mounted + gate overridden. + + The seeding router is normally only mounted when ENABLE_TEST_SEEDING=true at + process start. In tests we register the router on fastapi_app manually and + override DEnvironmentVariables to control the gate per test. + """ + # Mount the router (idempotent - FastAPI tolerates re-includes only via + # checking existing routes; safer to add once and rely on dependency + # overrides for gate behavior). + already_mounted = any( + getattr(r, "path", None) == "/test/seed" for r in fastapi_app.routes + ) + if not already_mounted: + fastapi_app.include_router(test_seeding_route.router) + + # Wire the use case to the isolated event repository. + def _make_use_case(): + return TestSeedingUseCase( + event_repository=isolated_repositories["event_repository"] + ) + + # Snapshot overrides so teardown can restore — covers both the fixture's + # own injections AND any mid-test mutations from `_override_env`. Without + # this, entries leak into fastapi_app's global state and can bleed into + # other test modules sharing the same app instance. + original_overrides = fastapi_app.dependency_overrides.copy() + + fastapi_app.dependency_overrides[TestSeedingUseCase] = _make_use_case + + # Default to enabled + dev env + valid token. + fastapi_app.dependency_overrides[get_seeding_env_vars] = lambda: _FakeEnvVars() + + from src.api.app import app as wrapped_app + + try: + async with AsyncClient( + transport=ASGITransport(app=wrapped_app), base_url="http://test" + ) as client: + yield client, isolated_repositories + finally: + fastapi_app.dependency_overrides = original_overrides + + +def _override_env(env_vars: _FakeEnvVars) -> None: + fastapi_app.dependency_overrides[get_seeding_env_vars] = lambda: env_vars + + +@pytest.mark.integration +class TestTestSeedingGate: + @pytest.mark.asyncio + async def test_flag_off_returns_404(self, seeding_client, seeded_agent_and_task): + client, _ = seeding_client + _override_env(_FakeEnvVars(enabled=False)) + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_prod_env_returns_404_even_with_flag_on( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + _override_env(_FakeEnvVars(enabled=True, environment=Environment.PROD)) + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 404 + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "bad_env", + [ + None, # unset env var + "", # empty string + "prod", # short form (not Environment.PROD's "production") + "Production", # case variant + "dev", # short form (not Environment.DEV's "development") + "qa", # unknown environment name + ], + ids=["unset", "empty", "prod_short", "prod_titlecase", "dev_short", "unknown"], + ) + async def test_unknown_environment_returns_404( + self, seeding_client, seeded_agent_and_task, bad_env + ): + """Allow-list, not deny-list: ENVIRONMENT is raw os.environ with no enum + coercion, so any value the gate doesn't explicitly recognize as non-prod + must fail closed.""" + client, _ = seeding_client + _override_env(_FakeEnvVars(enabled=True, environment=bad_env)) + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 404, ( + f"environment={bad_env!r} should fail closed, got {resp.status_code}" + ) + + @pytest.mark.asyncio + async def test_wrong_token_returns_404( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": "wrong-token"}, + ) + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_missing_token_returns_404( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + ) + assert resp.status_code == 404 + + @pytest.mark.asyncio + async def test_token_not_configured_returns_404( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + _override_env(_FakeEnvVars(enabled=True, token=None)) + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": "anything"}, + ) + assert resp.status_code == 404 + + +@pytest.mark.integration +class TestTestSeedingEvent: + @pytest.mark.asyncio + async def test_seed_event_happy_path( + self, seeding_client, seeded_agent_and_task + ): + client, repos = seeding_client + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + "content": {"hello": "world"}, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 201, resp.text + body = resp.json() + assert body["task_id"] == seeded_agent_and_task["task"].id + assert body["agent_id"] == seeded_agent_and_task["agent"].id + assert body["id"] + assert isinstance(body["sequence_id"], int) + + # Verify the row is in the repo + listed = await repos["event_repository"].list_events_after_last_processed( + task_id=seeded_agent_and_task["task"].id, + agent_id=seeded_agent_and_task["agent"].id, + ) + assert len(listed) == 1 + assert listed[0].id == body["id"] + + @pytest.mark.asyncio + async def test_seed_event_audit_marker_present( + self, seeding_client, seeded_agent_and_task + ): + client, repos = seeding_client + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + "content": {"foo": "bar"}, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 201, resp.text + body = resp.json() + # Response uses TaskMessageContent (api schema). The DataContentEntity + # round-trips through the API as {"type": "data", "data": {...}}. + content = body["content"] + assert content is not None + assert content["type"] == "data" + data = content["data"] + assert data.get("seeded") is True + assert "seeded_at" in data and isinstance(data["seeded_at"], str) + # Caller-provided keys preserved alongside the marker. + assert data.get("foo") == "bar" + + @pytest.mark.asyncio + async def test_seed_event_audit_marker_when_no_content( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 201, resp.text + data = resp.json()["content"]["data"] + assert data == {"seeded": True, "seeded_at": data["seeded_at"]} + + @pytest.mark.asyncio + async def test_seed_event_id_override( + self, seeding_client, seeded_agent_and_task + ): + client, _ = seeding_client + override_id = orm_id() + resp = await client.post( + "/test/seed", + json={ + "resource_type": "event", + "payload": { + "task_id": seeded_agent_and_task["task"].id, + "agent_id": seeded_agent_and_task["agent"].id, + "id": override_id, + }, + }, + headers={"X-Test-Seed-Token": VALID_TOKEN}, + ) + assert resp.status_code == 201, resp.text + assert resp.json()["id"] == override_id