-
Notifications
You must be signed in to change notification settings - Fork 46
feat(test-seeding): add gated POST /test/seed for e2e test resource seeding #301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ec9dbed
5569e47
a6ecc93
49bf903
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| """Test-only seeding endpoint. | ||
|
|
||
| POST /test/seed lets e2e tests insert resource rows directly, bypassing the ACP | ||
| runtime. The router is only mounted in app.py when both: | ||
| - env_vars.ENABLE_TEST_SEEDING is true | ||
| - env_vars.ENVIRONMENT is an explicitly-allowed non-prod value | ||
| (Environment.DEV or Environment.STAGING). Unknown / typo'd / unset | ||
| environments fail closed because ENVIRONMENT is typed `str | None` | ||
| with no enum coercion. | ||
|
|
||
| Per-request the endpoint also requires the X-Test-Seed-Token header to match | ||
| env_vars.TEST_SEED_TOKEN (compared with hmac.compare_digest). | ||
|
|
||
| All gate failures return 404 (not 401/403) to avoid advertising the route's | ||
| existence on misconfigured deployments. This file + the use case + the env | ||
| config + the mount check in app.py are the four removal points if test seeding | ||
| ever moves to a separate test-utilities image. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import hmac | ||
| from typing import Annotated, Any, Literal | ||
| from uuid import UUID | ||
|
|
||
| from fastapi import APIRouter, Depends, Header, HTTPException, Request, status | ||
| from pydantic import Field | ||
|
|
||
| from src.api.schemas.events import Event | ||
| from src.config.dependencies import GlobalDependencies | ||
| from src.config.environment_variables import Environment, EnvironmentVariables | ||
| from src.domain.use_cases.test_seeding_use_case import DTestSeedingUseCase | ||
| from src.utils.logging import make_logger | ||
| from src.utils.model_utils import BaseModel | ||
|
|
||
| # Allow-list of non-prod environment names for the seeding gate. Kept in sync | ||
| # with the mount-time check in src/api/app.py — any new non-prod environment | ||
| # must be added in both places. | ||
| _ALLOWED_ENVS: frozenset[str] = frozenset({Environment.DEV, Environment.STAGING}) | ||
|
|
||
|
|
||
| def get_seeding_env_vars() -> EnvironmentVariables: | ||
| """Named dependency callable for the seeding gate. | ||
|
|
||
| Defined as a named function (not an inline lambda) so tests can override it | ||
| via ``fastapi_app.dependency_overrides[get_seeding_env_vars] = ...``. The | ||
| process-wide ``DEnvironmentVariables`` alias uses an inline lambda which | ||
| cannot be keyed in dependency_overrides. | ||
| """ | ||
| return GlobalDependencies().environment_variables | ||
|
|
||
| logger = make_logger(__name__) | ||
|
|
||
| router = APIRouter(prefix="/test", tags=["TestSeeding"]) | ||
|
|
||
|
|
||
| # -- request payload schemas --------------------------------------------------- | ||
|
|
||
|
|
||
| class _EventSeedPayload(BaseModel): | ||
| """Payload for seeding a single event row.""" | ||
|
|
||
| task_id: UUID = Field(..., description="Parent task UUID. Must already exist.") | ||
| agent_id: UUID = Field(..., description="Parent agent UUID. Must already exist.") | ||
| content: dict[str, Any] | None = Field( | ||
| None, | ||
| description=( | ||
| "Optional event content. Will be wrapped in a DataContentEntity and " | ||
| "have audit-marker keys ('seeded', 'seeded_at') added before persist." | ||
| ), | ||
| ) | ||
| id: UUID | None = Field( | ||
| None, | ||
| description="Optional event UUID override. Auto-generated if omitted.", | ||
| ) | ||
|
|
||
|
|
||
| class SeedEventRequest(BaseModel): | ||
| """Discriminated request for seeding an event. | ||
|
|
||
| To add a new seedable resource (task, api_key, schedule, ...), add a sibling | ||
| `SeedXxxRequest` class with `resource_type: Literal["xxx"]`, then add it to | ||
| the `SeedRequest` union below and dispatch in the route handler. | ||
| """ | ||
|
|
||
| resource_type: Literal["event"] | ||
| payload: _EventSeedPayload | ||
|
|
||
|
|
||
| # When adding a second resource type, change this to: | ||
| # SeedRequest = Annotated[ | ||
| # SeedEventRequest | SeedTaskRequest | ..., | ||
| # Field(discriminator="resource_type"), | ||
| # ] | ||
| # For now there is a single variant; we keep the discriminated shape so the | ||
| # eventual extension is mechanical. | ||
| SeedRequest = SeedEventRequest | ||
|
|
||
|
|
||
| # -- gate ---------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _require_test_seeding_enabled( | ||
| env_vars: Annotated[EnvironmentVariables, Depends(get_seeding_env_vars)], | ||
| x_test_seed_token: Annotated[str | None, Header(alias="X-Test-Seed-Token")] = None, | ||
| ) -> None: | ||
| """Fail-closed gate for the seeding endpoint. | ||
|
|
||
| All failure modes return 404 (not 401/403) so we don't advertise that the | ||
| route exists on misconfigured deployments. Token comparison is constant-time. | ||
| """ | ||
| not_found = HTTPException( | ||
| status_code=status.HTTP_404_NOT_FOUND, detail="Not Found" | ||
| ) | ||
|
|
||
| # Hard env gate, regardless of flag. Allow-list rather than deny-list: | ||
| # ENVIRONMENT is raw os.environ with no enum coercion, so a deny-list | ||
| # against PROD would fail OPEN on unset / "prod" / "Production" / typos / | ||
| # new env names. Fail closed on anything we don't explicitly recognize as | ||
| # non-prod. | ||
| if env_vars.ENVIRONMENT not in _ALLOWED_ENVS: | ||
| raise not_found | ||
|
|
||
| if not env_vars.ENABLE_TEST_SEEDING: | ||
| raise not_found | ||
|
|
||
| expected = env_vars.TEST_SEED_TOKEN | ||
| if not expected: | ||
| # No token configured -> endpoint is unusable even with the flag on. | ||
| raise not_found | ||
|
|
||
| if not x_test_seed_token: | ||
| raise not_found | ||
|
|
||
| if not hmac.compare_digest(x_test_seed_token, expected): | ||
| raise not_found | ||
|
|
||
|
|
||
| # -- route --------------------------------------------------------------------- | ||
|
|
||
|
|
||
| @router.post( | ||
| "/seed", | ||
| response_model=Event, | ||
| status_code=status.HTTP_201_CREATED, | ||
| dependencies=[Depends(_require_test_seeding_enabled)], | ||
| ) | ||
| async def seed_resource( | ||
| body: SeedRequest, | ||
| request: Request, | ||
| use_case: DTestSeedingUseCase, | ||
| ) -> Event: | ||
| """Test-only direct insert. Returns the persisted resource entity. | ||
|
|
||
| Extension point: when SeedRequest becomes a true union, dispatch on | ||
| body.resource_type here. Each branch should call into its matching | ||
| `use_case.seed_<resource>(...)` method. | ||
| """ | ||
| principal_id: str | None = None | ||
| principal_ctx = getattr(request.state, "principal_context", None) | ||
| if isinstance(principal_ctx, dict): | ||
| principal_id = principal_ctx.get("user_id") or principal_ctx.get( | ||
| "service_account_id" | ||
| ) | ||
|
|
||
| if body.resource_type == "event": | ||
| payload = body.payload | ||
| event_entity = await use_case.seed_event( | ||
| task_id=str(payload.task_id), | ||
| agent_id=str(payload.agent_id), | ||
| content=payload.content, | ||
| id_override=str(payload.id) if payload.id is not None else None, | ||
| principal_id=principal_id, | ||
| ) | ||
| return Event.model_validate(event_entity) | ||
|
|
||
| # Defensive: the discriminator should make this unreachable. | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=f"Unsupported resource_type: {body.resource_type!r}", | ||
| ) | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,124 @@ | ||||||||||||
| """Test-only seeding use case. | ||||||||||||
|
|
||||||||||||
| Inserts resource rows directly into the repositories without going through the | ||||||||||||
| ACP runtime. Mounted only when ENABLE_TEST_SEEDING is true AND | ||||||||||||
| ENVIRONMENT != production. The endpoint that calls into this use case is gated | ||||||||||||
| to the same conditions plus a shared-secret header. See | ||||||||||||
| src/api/routes/test_seeding.py. | ||||||||||||
|
|
||||||||||||
| This module is deliberately isolated so it can be deleted in one surgical | ||||||||||||
| removal when/if test seeding moves into a separate test-utilities image. | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| from __future__ import annotations | ||||||||||||
|
|
||||||||||||
| from datetime import datetime, timezone | ||||||||||||
| from typing import Annotated, Any | ||||||||||||
|
|
||||||||||||
| from fastapi import Depends | ||||||||||||
|
|
||||||||||||
| from src.domain.entities.events import EventEntity | ||||||||||||
| from src.domain.entities.task_messages import ( | ||||||||||||
| DataContentEntity, | ||||||||||||
| MessageAuthor, | ||||||||||||
| TaskMessageContentEntity, | ||||||||||||
| TaskMessageContentType, | ||||||||||||
| ) | ||||||||||||
| from src.domain.repositories.event_repository import DEventRepository | ||||||||||||
| from src.utils.ids import orm_id | ||||||||||||
| from src.utils.logging import make_logger | ||||||||||||
|
|
||||||||||||
| logger = make_logger(__name__) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class TestSeedingUseCase: # noqa: PT001 — not a pytest class; "Test" prefix is the use-case domain name | ||||||||||||
| """Test-only resource seeding. | ||||||||||||
|
Comment on lines
+34
to
+35
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Python, only a string literal that is the first statement in the class body is treated as a docstring. Because
Suggested change
|
||||||||||||
|
|
||||||||||||
| Each `seed_<resource>` method writes a row directly via the matching | ||||||||||||
| repository, mirroring the persistence half of the natural-flow write path | ||||||||||||
| but skipping any downstream side effects (ACP forwards, etc.). | ||||||||||||
|
|
||||||||||||
| NOTE on FGAC: events are not a first-class FGAC resource | ||||||||||||
| (AgentexResourceType has only agent/task/api_key/schedule). Event authz | ||||||||||||
| delegates to the parent agent. When seeding future FGAC-registered | ||||||||||||
| resources (task, api_key, schedule), the corresponding seed_* method MUST | ||||||||||||
| also call authorization_service.register_resource(...) before persisting, | ||||||||||||
| mirroring the pattern in agent_api_keys_use_case._register_api_key_in_auth. | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| # Tell pytest not to collect this as a test class. Must come AFTER the | ||||||||||||
| # docstring — Python only treats a string literal as __doc__ when it's | ||||||||||||
| # the FIRST statement in the class body. | ||||||||||||
| __test__ = False | ||||||||||||
|
|
||||||||||||
| def __init__(self, event_repository: DEventRepository) -> None: | ||||||||||||
| self.event_repository = event_repository | ||||||||||||
|
|
||||||||||||
| async def seed_event( | ||||||||||||
| self, | ||||||||||||
| *, | ||||||||||||
| task_id: str, | ||||||||||||
| agent_id: str, | ||||||||||||
| content: dict[str, Any] | None = None, | ||||||||||||
| id_override: str | None = None, | ||||||||||||
| principal_id: str | None = None, | ||||||||||||
| ) -> EventEntity: | ||||||||||||
| """Seed a single event row. | ||||||||||||
|
|
||||||||||||
| Injects an audit marker `{"seeded": true, "seeded_at": <iso8601>}` into | ||||||||||||
| the persisted content so downstream tests can filter for seeded rows. | ||||||||||||
| """ | ||||||||||||
| event_id = id_override or orm_id() | ||||||||||||
| seeded_at = datetime.now(timezone.utc).isoformat() | ||||||||||||
|
|
||||||||||||
| # Build the persisted content: start with any caller-supplied dict, then | ||||||||||||
| # overlay the audit marker. If no content was supplied, persist just the | ||||||||||||
| # marker as a DataContentEntity (events.content is nullable but we want | ||||||||||||
| # the marker to always be present, and DATA is the only content type | ||||||||||||
| # that accepts an arbitrary dict). | ||||||||||||
| merged: dict[str, Any] = dict(content) if content else {} | ||||||||||||
| merged["seeded"] = True | ||||||||||||
| merged["seeded_at"] = seeded_at | ||||||||||||
|
|
||||||||||||
| # Seeded events are synthetic; mark them as agent-authored. The author | ||||||||||||
| # field is required by BaseTaskMessageContentEntity but has no | ||||||||||||
| # semantic meaning for seeded rows -- the {"seeded": true} audit | ||||||||||||
| # marker in `data` is the actual signal for downstream filtering. | ||||||||||||
| content_entity: TaskMessageContentEntity = DataContentEntity( | ||||||||||||
| type=TaskMessageContentType.DATA, | ||||||||||||
| author=MessageAuthor.AGENT, | ||||||||||||
| data=merged, | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| event = await self.event_repository.create( | ||||||||||||
| id=event_id, | ||||||||||||
| task_id=task_id, | ||||||||||||
| agent_id=agent_id, | ||||||||||||
| content=content_entity, | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| logger.info( | ||||||||||||
| "test seeding wrote resource", | ||||||||||||
| extra={ | ||||||||||||
| "resource_type": "event", | ||||||||||||
| "resource_id": event.id, | ||||||||||||
| "principal_id": principal_id, | ||||||||||||
| "task_id": task_id, | ||||||||||||
| "agent_id": agent_id, | ||||||||||||
| }, | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| # TODO when adding seed_task / seed_api_key / seed_schedule: | ||||||||||||
| # FGAC-registered resources MUST also call | ||||||||||||
| # authorization_service.register_resource( | ||||||||||||
| # resource=AgentexResource.<type>(<id>), | ||||||||||||
| # parent=AgentexResource.agent(<agent_id>), | ||||||||||||
| # ) | ||||||||||||
| # BEFORE persisting, mirroring agent_api_keys_use_case._register_api_key_in_auth. | ||||||||||||
| # Events are exempt from this because event authz delegates to the parent | ||||||||||||
| # agent (which must already exist & already be registered). | ||||||||||||
|
|
||||||||||||
| return event | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| DTestSeedingUseCase = Annotated[TestSeedingUseCase, Depends(TestSeedingUseCase)] | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I get the overall idea here.
I dont think seeding just event is enough because event permissions depend on the agent right? If we want to just seed the events why not just mock out the DB instead of creating a whole endpoint to create DB entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also what is the eventual intention of these tests we are writing? Do we intend them to run on each PR/each deployment?