Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,14 @@ Each endpoint takes a single `ContextDep`. The handler calls `context.authorize(
| GET | `/api/3/action/datastore_info` | **implemented** | `DatastoreInfoRequest` | `DatastoreInfoResponse` |
| GET | `/datastore/dump/{resource_id}` | **implemented** | `format=csv\|ndjson\|parquet` | 302 → GCS *or* streaming body (see §5.3) |

The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7.
The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, native table-level metadata (the Frictionless schema + unique_key are JSON-encoded into the table's own `description` OPTION) for the schema round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7.

`datastore_create` accepts two shapes:

- `resource_id` — table name only. Works under any `AUTH_TYPE`.
- `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it.
- `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. The resource is created with `url_type="datastore"` so CKAN (and the read-only guard below) knows the datastore owns its data. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it.

**Read-only guard (`AUTH_TYPE=ckan` only).** `datastore_create`, `datastore_upsert`, and `datastore_delete` refuse to write a resource whose CKAN record carries `url_type="datastore"` unless the request sets `force: true` — a `Validation Error` ("Cannot update a read-only resource. Use \"force\" to force update.") otherwise. This mirrors CKAN's protection against clobbering datastore-managed data by accident. The guard is gated on `AUTH_TYPE=ckan` and skipped entirely under any other provider (only the CKAN provider attaches a resource record).

### 5.3 `GET /datastore/dump/{resource_id}`

Expand Down Expand Up @@ -813,7 +815,8 @@ GET /api/3/datastore_search_sql?sql=
"force": false
}
```
Empty `filters` (or omitted) → the entire table is dropped.
Empty `filters` (or omitted) → the entire table is dropped. Passing `fields`
(mutually exclusive with `filters`) drops those columns instead of rows.

**Response**
```json
Expand All @@ -824,6 +827,22 @@ Empty `filters` (or omitted) → the entire table is dropped.
}
```

When `fields` is supplied (column drop), `result` also carries `schema` — the
Frictionless Table Schema after the listed columns were removed — so the caller
can confirm the table's new shape without a follow-up `datastore_info`:

```json
{
"help": "...",
"success": true,
"result": {
"resource_id": "balancing_auction_results_2025",
"fields": ["bidder_metadata"],
"schema": {"fields": [{"id": "auction_id", "type": "integer"}, "..."], "primaryKey": ["auction_id", "product_code"]}
}
}
```

### 6.6 `GET /api/3/datastore_info`

Returns the same field shape that was supplied to `datastore_create`, including
Expand Down Expand Up @@ -906,7 +925,7 @@ The original phase plan that used to live here has mostly shipped. This section

- [x] **Foundation** — `pyproject.toml`, `Dockerfile`, `Makefile`, `.env.example`, `docker-compose.yml`. App factory + lifespan in [datastore/main.py](datastore/main.py); body-size middleware in [datastore/api/middleware.py](datastore/api/middleware.py); startup log line via `uvicorn.error` showing the active engine + auth provider + cache backend.
- [x] **All six `datastore_*` actions wired** — `create`, `upsert`, `delete`, `search`, `search_sql`, `info` mounted via [datastore/api/routes.py](datastore/api/routes.py). Every endpoint authorizes via `context.authorize(...)` and delegates to a service.
- [x] **Real BigQuery backend** — [datastore/infrastructure/engines/bigquery/](datastore/infrastructure/engines/bigquery/) implements DDL, parameterised `search`, MERGE-based `upsert` (`method=upsert` / `insert` / `update`), DML `delete` (whole-table drop, row delete, column drop), parameterised `search_sql`, and `info`. Frictionless schema + `unique_key` round-trip via the `_table_metadata` table. Row counts use the cheap `INFORMATION_SCHEMA.TABLE_STORAGE` fast path when filters don't apply.
- [x] **Real BigQuery backend** — [datastore/infrastructure/engines/bigquery/](datastore/infrastructure/engines/bigquery/) implements DDL, parameterised `search`, MERGE-based `upsert` (`method=upsert` / `insert` / `update`), DML `delete` (whole-table drop, row delete, column drop), parameterised `search_sql`, and `info`. Frictionless schema + `unique_key` round-trip via native table-level metadata — JSON-encoded into the table's own `description` OPTION (no separate metadata table). Row counts use the cheap `INFORMATION_SCHEMA.TABLE_STORAGE` fast path when filters don't apply.
- [x] **Streaming search** — [datastore/services/streaming.py](datastore/services/streaming.py) yields the CKAN envelope chunk-by-chunk for all four `records_format` values (`objects`, `lists`, `csv`, `tsv`); CSV/TSV ride the same JSON envelope (records is a multi-line string). Peak memory ≈ 1 row regardless of N. `_links.start` / `_links.next` carry full scheme + host with all non-`offset` params preserved.
- [x] **`datastore_search_sql` SQL safety** — schema rejects non-SELECT / multi-statement / unparseable SQL (sqlglot). [datastore/schemas/validators.py](datastore/schemas/validators.py)'s `parse_sql_references` pulls table + function names; endpoint authorizes each table as a `resource_id`; service rejects functions outside the engine's allow-list at `engines/<name>/allowed_functions.txt` (overridable via `SQL_FUNCTIONS_ALLOW_FILE`).
- [x] **Request validation** — Pydantic models in [datastore/schemas/request.py](datastore/schemas/request.py) with `extra="forbid"`. `datastore_info` / `datastore_delete` accept `resource_id` or `id` (normalised). Pydantic errors → CKAN error envelope with a `fields` map.
Expand Down
22 changes: 22 additions & 0 deletions datastore/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,25 @@ async def authorize(
permission=permission,
)
return {"resource": decision.resource or {}, "package": decision.package or {}}


def ensure_resource_writable(
resource: dict[str, Any], *, force: bool, auth_type: str
) -> None:
"""Block writes to a CKAN datastore-managed resource unless `force`.

CKAN tags resources whose data the datastore owns with
`url_type="datastore"`; editing one through datastore_create /
_upsert / _delete requires an explicit `force` so managed data isn't
clobbered by accident.

Applies under `AUTH_TYPE="ckan"` only — that's the sole provider that
attaches a CKAN resource record. Other providers carry no such record,
so the guard is skipped entirely.
"""
if auth_type != "ckan":
return
if not force and resource.get("url_type") == "datastore":
raise ValidationError(
'Cannot update a read-only resource. Use "force" to force update.'
)
23 changes: 22 additions & 1 deletion datastore/api/endpoints/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from starlette.requests import Request
from starlette.responses import StreamingResponse

from datastore.api.auth import ensure_resource_writable
from datastore.api.context import Context
from datastore.api.responses import _deprecation_warnings, _success_response
from datastore.core.exceptions import ValidationError
Expand Down Expand Up @@ -63,6 +64,14 @@ async def datastore_create(
permission="create",
)

# Refuse to re-declare a datastore-managed resource unless forced
# (CKAN auth only; no-op on the new-resource dict path).
ensure_resource_writable(
data_dict["resource"],
force=bool(payload.force),
auth_type=context.config.AUTH_TYPE,
)

data_dict.update(
{
"resource": payload.resource_id or payload.resource,
Expand Down Expand Up @@ -90,6 +99,11 @@ async def datastore_upsert(
resource_id=payload.resource_id,
permission="update",
)
ensure_resource_writable(
data_dict["resource"],
force=payload.force,
auth_type=context.config.AUTH_TYPE,
)
data_dict.update(payload.model_dump())
result = await upsert_datastore(context, data_dict)
return _success_response(request, result)
Expand Down Expand Up @@ -181,6 +195,13 @@ async def datastore_delete(
Returns the original `filters` echoed back (CKAN convention) so the
caller can confirm what the server actually applied.
"""
await context.authorize(resource_id=payload.resource_id, permission="delete")
data_dict = await context.authorize(
resource_id=payload.resource_id, permission="delete"
)
ensure_resource_writable(
data_dict["resource"],
force=payload.force,
auth_type=context.config.AUTH_TYPE,
)
result = await delete_datastore(context, payload.model_dump())
return _success_response(request, result)
5 changes: 2 additions & 3 deletions datastore/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ def _check_engine_available(cls, v: str) -> str:
BIGQUERY_DATASET: str = Field(
default="",
description=(
"BigQuery dataset that holds the datastore tables. Both the "
"per-resource data tables and the internal `_table_metadata` "
"table live here. Required when DATASTORE_ENGINE=bigquery."
"BigQuery dataset that holds the per-resource datastore "
"tables. Required when DATASTORE_ENGINE=bigquery."
),
)
BIGQUERY_CREDENTIALS: str = Field(
Expand Down
2 changes: 1 addition & 1 deletion datastore/infrastructure/ckan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def datastore_authorize(
permission: str | None = None,
) -> dict[str, Any]:
"""`/api/3/action/datastore_authorize`.
Authorize resource and package.
Authorize resource and package.
"""
if (resource_id is None) == (package_id is None):
raise ValidationError(
Expand Down
52 changes: 5 additions & 47 deletions datastore/infrastructure/engines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any, Protocol, runtime_checkable
from typing import Any


@dataclass
Expand Down Expand Up @@ -31,6 +31,10 @@ def columns(self) -> list[str]:
class WriteResult:
rows_written: int = 0
total: int | None = None
# Resulting Frictionless schema after the write — populated by
# `delete()`'s column-drop path so the response can echo the table's
# shape minus the dropped columns. `None` for other write paths.
schema: dict[str, Any] | None = None


@dataclass
Expand All @@ -44,52 +48,6 @@ class InfoResult:
meta: dict[str, Any]


@runtime_checkable
class MetadataStore(Protocol):
"""Per-engine storage for table-level metadata.

Holds one row per `resource_id`, keyed by the resource_id itself. The
canonical column shape is `(resource_id, schema, created_at,
updated_at)` where `schema` is a Frictionless Table Schema dict.

Each engine subpackage provides a concrete implementation
(e.g. `bigquery/metadata.py: BigQueryMetadataStore`) so the SQL
dialect, connection management, and column types stay engine-private.
The backend constructs its store in `__init__`, calls `initialize()`
once at startup to create the underlying table, and calls `upsert`
from `create()` whenever a caller declares a new resource.

Adding a new engine = drop a sibling `metadata.py` implementing this
Protocol; the backend wires it in by holding `self.metadata`.
"""

def initialize(self) -> None:
"""Create the metadata table if it doesn't exist. Idempotent."""

def insert(self, resource_id: str, schema: dict[str, Any]) -> None:
"""Insert a new metadata row for `resource_id`.

Sets `created_at` and `updated_at` to now. Fails if a row with
the same `resource_id` already exists — that's a real conflict
that callers should surface (a second `datastore_create` for an
already-declared resource).
"""

def update(self, resource_id: str, schema: dict[str, Any]) -> None:
"""Update the metadata row for `resource_id`.

Replaces `schema` and bumps `updated_at`; `created_at` is
preserved. Keyed on `resource_id`; no-op when the row is absent.
"""

def get(self, resource_id: str) -> dict[str, Any] | None:
"""Return the stored Frictionless schema for `resource_id`,
or `None` when no row exists."""

def delete(self, resource_id: str) -> None:
"""Remove the metadata row for `resource_id`. No-op when absent."""


class DatastoreBackend(ABC):
@abstractmethod
def initialize(self) -> None:
Expand Down
Loading
Loading