Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion agentex/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
GlobalDependencies,
resolve_environment_variable_dependency,
)
from src.config.environment_variables import EnvVarKeys
from src.config.environment_variables import EnvironmentVariables, EnvVarKeys, Environment
from src.domain.exceptions import GenericException
from src.utils.logging import make_logger
from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics
Expand Down Expand Up @@ -195,6 +195,31 @@ async def handle_unexpected(request, exc):
fastapi_app.include_router(checkpoints.router)
fastapi_app.include_router(task_retention.router)

# Test-only seeding endpoint. Gated by env (must opt in AND be on a known
# non-prod environment) so this code path is unreachable in production
# deployments by construction -- the router is not even mounted.
#
# Allow-list rather than deny-list: ENVIRONMENT is typed `str | None` and
# populated raw from os.environ with no enum coercion, so a deny-list against
# Environment.PROD would fail OPEN on unset, "prod", "Production", typos, or
# any new environment name. Mount only when ENVIRONMENT is an explicitly
# known non-prod value.
_TEST_SEEDING_ALLOWED_ENVS = {Environment.DEV, Environment.STAGING}
_test_seeding_env_vars = EnvironmentVariables.refresh()
if (
_test_seeding_env_vars is not None
and _test_seeding_env_vars.ENABLE_TEST_SEEDING
and _test_seeding_env_vars.ENVIRONMENT in _TEST_SEEDING_ALLOWED_ENVS
):
from src.api.routes import test_seeding

fastapi_app.include_router(test_seeding.router)
logger.warning(
"Test seeding endpoint /test/seed is MOUNTED. "
"This must never happen in production.",
extra={"environment": _test_seeding_env_vars.ENVIRONMENT},
)

# Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses.
# This must be the outermost layer to bypass all middleware.
# Export as `app` so existing uvicorn entry points (app:app) work without changes.
Expand Down
181 changes: 181 additions & 0 deletions agentex/src/api/routes/test_seeding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""Test-only seeding endpoint.

POST /test/seed lets e2e tests insert resource rows directly, bypassing the ACP
runtime. The router is only mounted in app.py when both:
- env_vars.ENABLE_TEST_SEEDING is true
- env_vars.ENVIRONMENT is an explicitly-allowed non-prod value
(Environment.DEV or Environment.STAGING). Unknown / typo'd / unset
environments fail closed because ENVIRONMENT is typed `str | None`
with no enum coercion.

Per-request the endpoint also requires the X-Test-Seed-Token header to match
env_vars.TEST_SEED_TOKEN (compared with hmac.compare_digest).

All gate failures return 404 (not 401/403) to avoid advertising the route's
existence on misconfigured deployments. This file + the use case + the env
config + the mount check in app.py are the four removal points if test seeding
ever moves to a separate test-utilities image.
"""

from __future__ import annotations

import hmac
from typing import Annotated, Any, Literal
from uuid import UUID

from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from pydantic import Field

from src.api.schemas.events import Event
from src.config.dependencies import GlobalDependencies
from src.config.environment_variables import Environment, EnvironmentVariables
from src.domain.use_cases.test_seeding_use_case import DTestSeedingUseCase
from src.utils.logging import make_logger
from src.utils.model_utils import BaseModel

# Allow-list of non-prod environment names for the seeding gate. Kept in sync
# with the mount-time check in src/api/app.py — any new non-prod environment
# must be added in both places.
_ALLOWED_ENVS: frozenset[str] = frozenset({Environment.DEV, Environment.STAGING})


def get_seeding_env_vars() -> EnvironmentVariables:
"""Named dependency callable for the seeding gate.

Defined as a named function (not an inline lambda) so tests can override it
via ``fastapi_app.dependency_overrides[get_seeding_env_vars] = ...``. The
process-wide ``DEnvironmentVariables`` alias uses an inline lambda which
cannot be keyed in dependency_overrides.
"""
return GlobalDependencies().environment_variables

logger = make_logger(__name__)

router = APIRouter(prefix="/test", tags=["TestSeeding"])


# -- request payload schemas ---------------------------------------------------


class _EventSeedPayload(BaseModel):
"""Payload for seeding a single event row."""

task_id: UUID = Field(..., description="Parent task UUID. Must already exist.")
agent_id: UUID = Field(..., description="Parent agent UUID. Must already exist.")
content: dict[str, Any] | None = Field(
None,
description=(
"Optional event content. Will be wrapped in a DataContentEntity and "
"have audit-marker keys ('seeded', 'seeded_at') added before persist."
),
)
id: UUID | None = Field(
None,
description="Optional event UUID override. Auto-generated if omitted.",
)


class SeedEventRequest(BaseModel):
"""Discriminated request for seeding an event.

To add a new seedable resource (task, api_key, schedule, ...), add a sibling
`SeedXxxRequest` class with `resource_type: Literal["xxx"]`, then add it to
the `SeedRequest` union below and dispatch in the route handler.
"""

resource_type: Literal["event"]
payload: _EventSeedPayload


# When adding a second resource type, change this to:
# SeedRequest = Annotated[
# SeedEventRequest | SeedTaskRequest | ...,
# Field(discriminator="resource_type"),
# ]
# For now there is a single variant; we keep the discriminated shape so the
# eventual extension is mechanical.
SeedRequest = SeedEventRequest


# -- gate ----------------------------------------------------------------------


def _require_test_seeding_enabled(
env_vars: Annotated[EnvironmentVariables, Depends(get_seeding_env_vars)],
x_test_seed_token: Annotated[str | None, Header(alias="X-Test-Seed-Token")] = None,
) -> None:
"""Fail-closed gate for the seeding endpoint.

All failure modes return 404 (not 401/403) so we don't advertise that the
route exists on misconfigured deployments. Token comparison is constant-time.
"""
not_found = HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Not Found"
)

# Hard env gate, regardless of flag. Allow-list rather than deny-list:
# ENVIRONMENT is raw os.environ with no enum coercion, so a deny-list
# against PROD would fail OPEN on unset / "prod" / "Production" / typos /
# new env names. Fail closed on anything we don't explicitly recognize as
# non-prod.
if env_vars.ENVIRONMENT not in _ALLOWED_ENVS:
raise not_found

if not env_vars.ENABLE_TEST_SEEDING:
raise not_found

expected = env_vars.TEST_SEED_TOKEN
if not expected:
# No token configured -> endpoint is unusable even with the flag on.
raise not_found

if not x_test_seed_token:
raise not_found

if not hmac.compare_digest(x_test_seed_token, expected):
raise not_found


# -- route ---------------------------------------------------------------------


@router.post(
"/seed",
response_model=Event,
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(_require_test_seeding_enabled)],
)
async def seed_resource(
body: SeedRequest,
request: Request,
use_case: DTestSeedingUseCase,
) -> Event:
"""Test-only direct insert. Returns the persisted resource entity.

Extension point: when SeedRequest becomes a true union, dispatch on
body.resource_type here. Each branch should call into its matching
`use_case.seed_<resource>(...)` method.
"""
principal_id: str | None = None
principal_ctx = getattr(request.state, "principal_context", None)
if isinstance(principal_ctx, dict):
principal_id = principal_ctx.get("user_id") or principal_ctx.get(
"service_account_id"
)

if body.resource_type == "event":
payload = body.payload
event_entity = await use_case.seed_event(
task_id=str(payload.task_id),
agent_id=str(payload.agent_id),
content=payload.content,
id_override=str(payload.id) if payload.id is not None else None,
principal_id=principal_id,
)
return Event.model_validate(event_entity)
Comment on lines +166 to +175

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get the overall idea here.

I dont think seeding just event is enough because event permissions depend on the agent right? If we want to just seed the events why not just mock out the DB instead of creating a whole endpoint to create DB entries?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what is the eventual intention of these tests we are writing? Do we intend them to run on each PR/each deployment?


# Defensive: the discriminator should make this unreachable.
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported resource_type: {body.resource_type!r}",
)
11 changes: 11 additions & 0 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class EnvVarKeys(str, Enum):
RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE"
RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT"
RETENTION_CLEANUP_DRY_RUN = "RETENTION_CLEANUP_DRY_RUN"
ENABLE_TEST_SEEDING = "ENABLE_TEST_SEEDING"
TEST_SEED_TOKEN = "TEST_SEED_TOKEN"


class Environment(str, Enum):
Expand Down Expand Up @@ -128,6 +130,11 @@ class EnvironmentVariables(BaseModel):
RETENTION_CLEANUP_PAGE_SIZE: int = 200
RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20
RETENTION_CLEANUP_DRY_RUN: bool = True
# Test-only seeding gate. Both must be set for the /test/seed endpoint to be
# mounted. Defaults are fail-closed: endpoint is unreachable in any env that
# doesn't explicitly opt in. Hard-gated off in production regardless of flag.
ENABLE_TEST_SEEDING: bool = False
TEST_SEED_TOKEN: str | None = None

@classmethod
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
Expand Down Expand Up @@ -242,6 +249,10 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
RETENTION_CLEANUP_DRY_RUN=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true") == "true"
),
ENABLE_TEST_SEEDING=(
os.environ.get(EnvVarKeys.ENABLE_TEST_SEEDING, "false") == "true"
),
TEST_SEED_TOKEN=os.environ.get(EnvVarKeys.TEST_SEED_TOKEN),
)
refreshed_environment_variables = environment_variables
return refreshed_environment_variables
Expand Down
124 changes: 124 additions & 0 deletions agentex/src/domain/use_cases/test_seeding_use_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Test-only seeding use case.

Inserts resource rows directly into the repositories without going through the
ACP runtime. Mounted only when ENABLE_TEST_SEEDING is true AND
ENVIRONMENT != production. The endpoint that calls into this use case is gated
to the same conditions plus a shared-secret header. See
src/api/routes/test_seeding.py.

This module is deliberately isolated so it can be deleted in one surgical
removal when/if test seeding moves into a separate test-utilities image.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Annotated, Any

from fastapi import Depends

from src.domain.entities.events import EventEntity
from src.domain.entities.task_messages import (
DataContentEntity,
MessageAuthor,
TaskMessageContentEntity,
TaskMessageContentType,
)
from src.domain.repositories.event_repository import DEventRepository
from src.utils.ids import orm_id
from src.utils.logging import make_logger

logger = make_logger(__name__)


class TestSeedingUseCase: # noqa: PT001 — not a pytest class; "Test" prefix is the use-case domain name
"""Test-only resource seeding.
Comment on lines +34 to +35

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python, only a string literal that is the first statement in the class body is treated as a docstring. Because __test__ = False appears first, this multi-line string is just a discarded expression — TestSeedingUseCase.__doc__ will be None. Swapping the two lines fixes it without any functional change.

Suggested change
class TestSeedingUseCase: # noqa: PT001 — not a pytest class; "Test" prefix is the use-case domain name
__test__ = False # tell pytest not to collect this as a test class
"""Test-only resource seeding.
class TestSeedingUseCase: # noqa: PT001 — not a pytest class; "Test" prefix is the use-case domain name
"""Test-only resource seeding.


Each `seed_<resource>` method writes a row directly via the matching
repository, mirroring the persistence half of the natural-flow write path
but skipping any downstream side effects (ACP forwards, etc.).

NOTE on FGAC: events are not a first-class FGAC resource
(AgentexResourceType has only agent/task/api_key/schedule). Event authz
delegates to the parent agent. When seeding future FGAC-registered
resources (task, api_key, schedule), the corresponding seed_* method MUST
also call authorization_service.register_resource(...) before persisting,
mirroring the pattern in agent_api_keys_use_case._register_api_key_in_auth.
"""

# Tell pytest not to collect this as a test class. Must come AFTER the
# docstring — Python only treats a string literal as __doc__ when it's
# the FIRST statement in the class body.
__test__ = False

def __init__(self, event_repository: DEventRepository) -> None:
self.event_repository = event_repository

async def seed_event(
self,
*,
task_id: str,
agent_id: str,
content: dict[str, Any] | None = None,
id_override: str | None = None,
principal_id: str | None = None,
) -> EventEntity:
"""Seed a single event row.

Injects an audit marker `{"seeded": true, "seeded_at": <iso8601>}` into
the persisted content so downstream tests can filter for seeded rows.
"""
event_id = id_override or orm_id()
seeded_at = datetime.now(timezone.utc).isoformat()

# Build the persisted content: start with any caller-supplied dict, then
# overlay the audit marker. If no content was supplied, persist just the
# marker as a DataContentEntity (events.content is nullable but we want
# the marker to always be present, and DATA is the only content type
# that accepts an arbitrary dict).
merged: dict[str, Any] = dict(content) if content else {}
merged["seeded"] = True
merged["seeded_at"] = seeded_at

# Seeded events are synthetic; mark them as agent-authored. The author
# field is required by BaseTaskMessageContentEntity but has no
# semantic meaning for seeded rows -- the {"seeded": true} audit
# marker in `data` is the actual signal for downstream filtering.
content_entity: TaskMessageContentEntity = DataContentEntity(
type=TaskMessageContentType.DATA,
author=MessageAuthor.AGENT,
data=merged,
)

event = await self.event_repository.create(
id=event_id,
task_id=task_id,
agent_id=agent_id,
content=content_entity,
)

logger.info(
"test seeding wrote resource",
extra={
"resource_type": "event",
"resource_id": event.id,
"principal_id": principal_id,
"task_id": task_id,
"agent_id": agent_id,
},
)

# TODO when adding seed_task / seed_api_key / seed_schedule:
# FGAC-registered resources MUST also call
# authorization_service.register_resource(
# resource=AgentexResource.<type>(<id>),
# parent=AgentexResource.agent(<agent_id>),
# )
# BEFORE persisting, mirroring agent_api_keys_use_case._register_api_key_in_auth.
# Events are exempt from this because event authz delegates to the parent
# agent (which must already exist & already be registered).

return event


DTestSeedingUseCase = Annotated[TestSeedingUseCase, Depends(TestSeedingUseCase)]
Loading
Loading