From ea0d1a1f9cc2b4c87a0dde446ca66ced9b3941b2 Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Sat, 23 May 2026 21:29:25 +0545 Subject: [PATCH 1/2] Fix: address CodeRabbit review (PR #1, 2026-05-23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop JWT `jti` cache-key derivation in default_key_id — an unverified `jti` could collide a forged token with a real user's cached decision. Always sha256 the full credential. - CKAN provider: treat a corrupt cache entry as a miss (fall back to CKAN) instead of failing the request. - CKAN provider: store the hashed key in Decision.subject instead of the raw credential, so the cache never carries plaintext keys. - Postman generator: fix OUT_FILE typo (`ollection.json` → `postman/collection.json`), URL-encode query-string values, and fix the docstring's run-from-root path. - README: document `BIGQUERY_DATASET` in the env-var table. - Tests: the "without API key" assertions actually drop the Authorization header now (the conftest fixture installs one by default). Co-Authored-By: Claude Opus 4.7 --- README.md | 1 + datastore/auth/base.py | 28 +++++++----------- datastore/auth/ckan/provider.py | 26 +++++++++++++---- postman/generate_postman.py | 16 ++++++---- tests/auth/ckan/test_provider.py | 50 +++++++++++++++++++++++--------- tests/auth/jwt/test_provider.py | 6 ++-- tests/auth/test_base.py | 41 +++++++++++--------------- tests/test_datastore_create.py | 8 ++--- tests/test_datastore_search.py | 8 ++--- 9 files changed, 107 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index e0b8674..701a82d 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,7 @@ Every entry below maps 1:1 to a field on `datastore.core.config.Config`. See [.e | `DATASTORE_ENGINE` | `bigquery` | Storage backend — must match a folder under `infrastructure/engines/`; validated at startup | | `SQL_FUNCTIONS_ALLOW_FILE` | _(empty)_ | Override path to the `datastore_search_sql` function allow-list; defaults to `/allowed_functions.txt` | | `BIGQUERY_PROJECT` | _(empty)_ | Google Cloud project ID. Required when `DATASTORE_ENGINE=bigquery`; unset → `/ready` returns 503 with a clear warning. | +| `BIGQUERY_DATASET` | _(empty)_ | BigQuery dataset that holds per-resource tables + the engine-managed `_table_metadata`. Required when `DATASTORE_ENGINE=bigquery`; unset → metadata store is disabled and writes fall through to placeholder mode. | | `BIGQUERY_CREDENTIALS` | _(empty)_ | Read-write service-account creds. Accepts a JSON blob (leading `{`), a path to a service-account JSON file, or empty (→ Application Default Credentials). | | `BIGQUERY_CREDENTIALS_RO` | _(empty)_ | Read-only service-account creds (same format). Empty → falls back to `BIGQUERY_CREDENTIALS` so single-credential deployments work. | | `BIGQUERY_USE_QUERY_CACHE` | `true` | Use BigQuery's 24h query-results cache on `datastore_search` / `datastore_search_sql` / `datastore_info`. Identical SELECTs return free + fast on cache hits. Set `false` to force a fresh scan. | diff --git a/datastore/auth/base.py b/datastore/auth/base.py index ab6747e..ee44ed2 100644 --- a/datastore/auth/base.py +++ b/datastore/auth/base.py @@ -11,13 +11,10 @@ from __future__ import annotations -import base64 import hashlib from dataclasses import dataclass from typing import Any, Protocol -import orjson - @dataclass(slots=True, frozen=True) class Decision: @@ -47,20 +44,15 @@ def key_id(self, credential: str) -> str: def default_key_id(credential: str) -> str: - """JWT `jti` if the credential is a JWT; sha256 prefix otherwise. - - Shared by providers that accept either opaque or JWT tokens. + """sha256 prefix of the full credential string. + + Security note: deliberately ignores any embedded JWT `jti` claim. An + unverified `jti` from the token's payload can be forged to collide + with a cached authorization decision for a different (verified) + token — the cache lookup is keyed before signature verification, so + a forged `jti:` lookup would return the cached decision for + the legitimate user with the same `jti`. Hashing the whole + credential keeps the cache identity tied to bytes-on-the-wire and + makes any collision strictly equivalent to a sha256 collision. """ - parts = credential.split(".") - if len(parts) == 3: - try: - segment = parts[1] - padded = segment + "=" * (-len(segment) % 4) - payload = orjson.loads(base64.urlsafe_b64decode(padded)) - if isinstance(payload, dict): - jti = payload.get("jti") - if isinstance(jti, str) and jti: - return f"jti:{jti}" - except (ValueError, TypeError, orjson.JSONDecodeError): - pass return "h:" + hashlib.sha256(credential.encode()).hexdigest()[:16] diff --git a/datastore/auth/ckan/provider.py b/datastore/auth/ckan/provider.py index 8232dc0..b052b7e 100644 --- a/datastore/auth/ckan/provider.py +++ b/datastore/auth/ckan/provider.py @@ -51,11 +51,22 @@ async def authorize( cached = await _safe_get(self._cache, cache_key) if cached is not None: - log.debug( - "ckan auth cache HIT scope=%s target=%s perm=%s", - scope, target, permission, - ) - return _decision_from_bytes(cached) + try: + decision = _decision_from_bytes(cached) + log.debug( + "ckan auth cache HIT scope=%s target=%s perm=%s", + scope, target, permission, + ) + return decision + except (AuthorizationError, ValueError, TypeError) as e: + # Treat a corrupt cache entry as a miss — fall through + # to CKAN. Blocking auth on a poisoned cache would be a + # self-inflicted outage. + log.warning( + "ckan auth cache entry malformed for scope=%s target=%s: " + "%s — falling back to CKAN", + scope, target, e, + ) log.debug( "ckan auth cache MISS scope=%s target=%s perm=%s -> CKAN", @@ -67,8 +78,11 @@ async def authorize( package_id=package_id, permission=permission, ) + # `subject` rides through the cache (orjson-serialised). Never + # store the raw credential there — use the same hash we already + # derive for the cache key. decision = Decision( - subject=credential, + subject=self.key_id(credential) if credential else None, resource=result.get("resource"), package=result.get("package"), ) diff --git a/postman/generate_postman.py b/postman/generate_postman.py index 7e41e8b..aca9144 100644 --- a/postman/generate_postman.py +++ b/postman/generate_postman.py @@ -7,10 +7,10 @@ Run from the repo root: - python scripts/generate_postman.py + python postman/generate_postman.py -Output: `postman/datastore-api.postman_collection.json` — import into -Postman / Insomnia and set the `apiKey` collection variable. +Output: `postman/collection.json` — import into Postman / Insomnia and +set the `apiKey` collection variable. """ from __future__ import annotations @@ -19,10 +19,11 @@ import uuid from pathlib import Path from typing import Any +from urllib.parse import quote REPO = Path(__file__).resolve().parent.parent SOURCE_DIR = REPO / "example_payload" -OUT_FILE = REPO / "ollection.json" +OUT_FILE = REPO / "postman" / "collection.json" # Each endpoint's HTTP method + folder description. Order here matches # the walkthrough flow: declare → write → inspect → query → cleanup. @@ -76,9 +77,14 @@ def _request_url(path: str, query: list[dict[str, str]] | None = None) -> dict[str, Any]: """Postman v2.1 structured URL — lets the Postman UI edit params.""" parts = path.strip("/").split("/") + # Values can be JSON-encoded (e.g. `filters={"col":"v"}`) or contain + # spaces / `=` / `&`; percent-encode so the `raw` URL parses cleanly. url: dict[str, Any] = { "raw": "{{baseUrl}}/" + "/".join(parts) + ( - "?" + "&".join(f"{q['key']}={q['value']}" for q in query) + "?" + "&".join( + f"{quote(q['key'], safe='')}={quote(q['value'], safe='')}" + for q in query + ) if query else "" ), "host": ["{{baseUrl}}"], diff --git a/tests/auth/ckan/test_provider.py b/tests/auth/ckan/test_provider.py index 85cbece..37e3853 100644 --- a/tests/auth/ckan/test_provider.py +++ b/tests/auth/ckan/test_provider.py @@ -102,8 +102,9 @@ def test_authorize_binds_credential_and_maps_response_to_decision() -> None: "permission": "read", } ] - # Subject preserves the raw credential (callers may attribute to it). - assert decision.subject == "token-xyz" + # `subject` carries a hash of the credential (raw key never leaves + # this provider). Same shape as `key_id`. + assert decision.subject == provider.key_id("token-xyz") assert decision.resource == {"id": "res-1", "package_id": "pkg-1"} assert decision.package == {"id": "pkg-1"} assert decision.claims is None @@ -212,30 +213,51 @@ def test_cache_failure_falls_through_to_ckan() -> None: assert len(ckan.calls) == 1 -def test_malformed_cache_entry_raises_authorization_error() -> None: - # A poisoned cache value (not a JSON dict) must not be silently accepted. +def test_malformed_cache_entry_falls_through_to_ckan() -> None: + # A poisoned cache value (not a JSON dict) is treated as a miss. + # Blocking auth on a corrupt cache entry would be a self-inflicted + # outage; we log + re-query CKAN instead. + ckan = FakeCKAN() cache = InMemoryCache() - provider = _provider(cache=cache) + provider = _provider(ckan=ckan, cache=cache) cache_key = ( f"auth:ckan:{provider.key_id('tok')}:res:res-1:read" ) asyncio.run(cache.set(cache_key, orjson.dumps("not-a-dict"), 60)) - with pytest.raises(AuthorizationError, match="malformed"): - asyncio.run(provider.authorize( - credential="tok", resource_id="res-1", package_id=None, permission="read", - )) + decision = asyncio.run(provider.authorize( + credential="tok", resource_id="res-1", package_id=None, permission="read", + )) + # Fell back to CKAN and got the canned decision. + assert decision.resource == {"id": "res-1", "package_id": "pkg-1"} + assert len(ckan.calls) == 1 -# --- key derivation + name -------------------------------------------------- +def test_subject_in_cached_decision_is_hashed_not_raw_credential() -> None: + # Security: the raw credential must never end up in the cache. + # `Decision.subject` is what gets serialised — store the hash. + ckan = FakeCKAN() + provider = _provider(ckan=ckan, cache=InMemoryCache()) + + decision = asyncio.run(provider.authorize( + credential="raw-api-key-do-not-leak", + resource_id="res-1", package_id=None, permission="read", + )) + + assert decision.subject is not None + assert "raw-api-key-do-not-leak" not in decision.subject + assert decision.subject.startswith("h:") -def test_key_id_uses_jti_for_jwt_credentials() -> None: - token = jwt.encode({"sub": "u", "jti": "tok-42"}, "k", algorithm="HS256") - assert _provider().key_id(token) == "jti:tok-42" + +# --- key derivation + name -------------------------------------------------- -def test_key_id_falls_back_to_sha256_for_opaque_credentials() -> None: +def test_key_id_hashes_credentials_regardless_of_jwt_shape() -> None: + # JWTs and opaque tokens both go through sha256 — never trust an + # unverified JWT claim for cache identity. + jwt_tok = jwt.encode({"sub": "u", "jti": "tok-42"}, "k", algorithm="HS256") + assert _provider().key_id(jwt_tok).startswith("h:") assert _provider().key_id("opaque-api-key").startswith("h:") diff --git a/tests/auth/jwt/test_provider.py b/tests/auth/jwt/test_provider.py index 446df03..b415556 100644 --- a/tests/auth/jwt/test_provider.py +++ b/tests/auth/jwt/test_provider.py @@ -105,10 +105,12 @@ def test_garbled_token_raises_authorization_error() -> None: _authorize(_provider(), "not.a.real.jwt") -def test_key_id_uses_jti_for_caching() -> None: +def test_key_id_hashes_full_token() -> None: + # Cache identity is sha256-of-credential — unverified JWT claims + # (like `jti`) are never used for the cache key. provider = _provider() token = jwt.encode({"sub": "u", "jti": "tok-1"}, SECRET, algorithm="HS256") - assert provider.key_id(token) == "jti:tok-1" + assert provider.key_id(token).startswith("h:") def test_provider_name_is_jwt() -> None: diff --git a/tests/auth/test_base.py b/tests/auth/test_base.py index 4c981e9..36728de 100644 --- a/tests/auth/test_base.py +++ b/tests/auth/test_base.py @@ -1,12 +1,10 @@ -"""`Decision` shape + `default_key_id` JWT/opaque handling.""" +"""`Decision` shape + `default_key_id` always-sha256 behaviour.""" from __future__ import annotations -import base64 import hashlib import jwt -import orjson import pytest from datastore.auth.base import Decision, default_key_id @@ -19,36 +17,31 @@ def test_decision_defaults_are_all_none() -> None: assert d.package is None -def test_default_key_id_extracts_jti_from_jwt() -> None: +def test_default_key_id_hashes_full_credential_even_for_jwt() -> None: + # Security: never derive cache identity from unverified JWT claims + # (a forged `jti` could collide with a verified user's cache entry). + # The full credential bytes always go through sha256. token = jwt.encode({"sub": "u", "jti": "abc123"}, "k", algorithm="HS256") - assert default_key_id(token) == "jti:abc123" - - -def test_default_key_id_falls_back_to_sha256_for_jwt_without_jti() -> None: - token = jwt.encode({"sub": "u"}, "k", algorithm="HS256") expected = "h:" + hashlib.sha256(token.encode()).hexdigest()[:16] assert default_key_id(token) == expected -def test_default_key_id_falls_back_to_sha256_for_opaque_token() -> None: +def test_default_key_id_hashes_opaque_token() -> None: token = "opaque-token-no-dots" expected = "h:" + hashlib.sha256(token.encode()).hexdigest()[:16] assert default_key_id(token) == expected -def test_default_key_id_ignores_non_string_jti() -> None: - # Hand-craft a JWT-shaped payload with a numeric `jti` — fall back to sha256. - payload = base64.urlsafe_b64encode(orjson.dumps({"jti": 12345})).rstrip(b"=").decode() - token = f"hdr.{payload}.sig" - assert default_key_id(token).startswith("h:") - +def test_two_different_jwts_with_same_jti_get_different_cache_keys() -> None: + # The whole point of dropping the jti optimisation: A and B both + # claim `jti=shared` but were signed differently. Their cache keys + # must NOT collide. + a = jwt.encode({"sub": "a", "jti": "shared"}, "key-1", algorithm="HS256") + b = jwt.encode({"sub": "b", "jti": "shared"}, "key-2", algorithm="HS256") + assert default_key_id(a) != default_key_id(b) -def test_default_key_id_handles_malformed_jwt_segment() -> None: - # Three-segment string but middle segment is not valid base64/json. - assert default_key_id("hdr.@@@.sig").startswith("h:") - -@pytest.mark.parametrize("token", ["", "a", "a.b", "a.b.c.d"]) -def test_default_key_id_for_non_three_segment_inputs(token: str) -> None: - # Anything that isn't a 3-part JWT → hashed. - assert default_key_id(token).startswith("h:") +@pytest.mark.parametrize("token", ["", "a", "a.b", "a.b.c", "a.b.c.d"]) +def test_default_key_id_is_sha256_for_any_input_shape(token: str) -> None: + expected = "h:" + hashlib.sha256(token.encode()).hexdigest()[:16] + assert default_key_id(token) == expected diff --git a/tests/test_datastore_create.py b/tests/test_datastore_create.py index 5b6b249..91c6f8d 100644 --- a/tests/test_datastore_create.py +++ b/tests/test_datastore_create.py @@ -222,10 +222,10 @@ def test_create_without_api_key_returns_403( but writes always require an authenticated user — short-circuit with 403 before CKAN is even called.""" before = fake_ckan.authorize_calls - response = client.post( - CREATE_URL, json=_valid_payload_with_resource_id(), - headers={"Authorization": ""}, - ) + # Drop the default Authorization header the conftest sets — we + # want a real "no header" request, not "header with empty value". + client.headers.pop("Authorization", None) + response = client.post(CREATE_URL, json=_valid_payload_with_resource_id()) assert response.status_code == 403 assert response.json()["error"]["__type"] == "Authorization Error" # CKAN never sees the request — we reject before delegating. diff --git a/tests/test_datastore_search.py b/tests/test_datastore_search.py index f3ecad9..93a5b36 100644 --- a/tests/test_datastore_search.py +++ b/tests/test_datastore_search.py @@ -253,10 +253,10 @@ def test_anonymous_read_calls_ckan_and_succeeds( visibility; on the FakeCKAN (no deny-list, no visibility flags) that succeeds, so the request returns 200.""" before = fake_ckan.authorize_calls - response = client.get( - SEARCH_URL, params=_params(), - headers={"Authorization": ""}, - ) + # Drop the default Authorization header the conftest sets — we + # want a real "no header" request, not "header with empty value". + client.headers.pop("Authorization", None) + response = client.get(SEARCH_URL, params=_params()) assert response.status_code == 200 # Confirms the auth path actually reached CKAN (not short-circuited). assert fake_ckan.authorize_calls - before == 1 From a7339f81477d4d85357974e2a133f89e53a1142a Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Tue, 26 May 2026 14:42:46 +0545 Subject: [PATCH 2/2] Feat (dump): /datastore/dump/ with 302-or-stream-concat download MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements `GET /datastore/dump/{resource_id}?format=csv|ndjson|parquet` as a single user-facing URL that yields one file regardless of how BigQuery shards the underlying EXPORT DATA job. Pipeline (engine): 1. Read `table.modified`, derive a content-addressed prefix `dumps///` where rev = hex(microsec epoch). 2. List GCS at that prefix — non-empty → cache hit, skip the export. 3. On miss, submit `EXPORT DATA` (Parquet → single URI, CSV/NDJSON → wildcard URI so BQ shards >1 GB exports). 4. After a successful extract, sweep stale revisions for the (rid, fmt) pair — storage is bounded to one rev per format. 5. Generate V4 signed URLs with `response-content-disposition` so the downloaded filename is `.` regardless of GCS object name. Why asyncio (and not threads / sync httpx): - **One URL → one file** is the user-visible contract. For shards >1 GB BQ refuses single-file CSV/NDJSON, so we have to stream-concat on the server. The byte path is GCS → us → client. - With sync httpx + StreamingResponse FastAPI would offload each shard read to the threadpool. With multi-GB dumps that means a worker thread held for the entire transfer — a handful of concurrent dumps starves the pool and blocks unrelated requests. - Async httpx + `aiter_bytes(64 KiB)` keeps the whole flow on the event loop: ~64 KiB resident per active dump, zero threads, and client disconnect cancels the coroutine which propagates through httpx to release the GCS connection immediately. - Same reasoning drives `asyncio.to_thread(job.reload)` + `await asyncio.sleep(...)` in the engine's poll loop — the worker is held only for the brief `reload` round-trip, not during the wait. CSV stream-concat strips the duplicate header off non-first shards via `_skip_first_line` (memory bound: header bytes + one chunk). NDJSON is pure byte concatenation. Parquet >1 GB is refused upstream with 413 (footers can't be concatenated); caller switches to CSV/NDJSON. IAM follows the ro-for-reading, rw-for-writing split documented in CLAUDE.md §5.3 — `BIGQUERY_CREDENTIALS_RO` for `get_table` + `list_blobs` cache lookup, `BIGQUERY_CREDENTIALS` for `EXPORT DATA` + post-extract list + signing + GC. Includes 592-line test suite covering single-shard redirect, multi-shard stream-concat (CSV header-dedup, NDJSON concat), cache hit/miss, stale-rev GC, Parquet >1 GB rejection, and IAM split. Co-Authored-By: Claude Opus 4.7 (1M context) --- .env.example | 5 +- CLAUDE.md | 48 +- README.md | 18 +- datastore/api/endpoints/dump.py | 67 ++ datastore/api/routes.py | 5 +- datastore/core/config.py | 14 + datastore/core/exceptions.py | 13 +- datastore/infrastructure/engines/base.py | 5 + .../engines/bigquery/backend.py | 284 +++++++++ .../infrastructure/engines/bigquery/client.py | 27 +- datastore/services/dump.py | 84 +++ datastore/services/read.py | 18 +- datastore/services/write.py | 15 +- pyproject.toml | 1 + tests/conftest.py | 38 +- tests/test_datastore_dump.py | 592 ++++++++++++++++++ 16 files changed, 1196 insertions(+), 38 deletions(-) create mode 100644 datastore/api/endpoints/dump.py create mode 100644 datastore/services/dump.py create mode 100644 tests/test_datastore_dump.py diff --git a/.env.example b/.env.example index 3fc27bc..bbad1d7 100644 --- a/.env.example +++ b/.env.example @@ -16,10 +16,9 @@ BIGQUERY_PROJECT= BIGQUERY_DATASET= BIGQUERY_CREDENTIALS= BIGQUERY_CREDENTIALS_RO= -# Use BigQuery's built-in 24h query-results cache on read paths -# (datastore_search / datastore_search_sql / datastore_info). Identical -# SELECTs return free + fast on cache hits. False = force fresh scan. BIGQUERY_USE_QUERY_CACHE=true +BIGQUERY_EXPORT_BUCKET= +BIGQUERY_EXPORT_URL_EXPIRY_HOURS=1 SQL_FUNCTIONS_ALLOW_FILE= # --- Auth --- diff --git a/CLAUDE.md b/CLAUDE.md index 8dc3485..56ad551 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -424,14 +424,60 @@ Each endpoint takes a single `ContextDep`. The handler calls `context.authorize( | GET | `/api/3/action/datastore_search` | **implemented** (streaming) | `DatastoreSearchRequest` | `DatastoreSearchResponse` | | GET | `/api/3/action/datastore_search_sql` | **implemented** (streaming) | `DatastoreSearchSQLRequest` | `DatastoreSearchResponse` | | GET | `/api/3/action/datastore_info` | **implemented** | `DatastoreInfoRequest` | `DatastoreInfoResponse` | +| GET | `/datastore/dump/{resource_id}` | **implemented** | `format=csv\|ndjson\|parquet` | 302 → GCS *or* streaming body (see §5.3) | -The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, and a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`. The DuckLake engine is the next concrete adapter — see §7. +The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7. `datastore_create` accepts two shapes: - `resource_id` — table name only. Works under any `AUTH_TYPE`. - `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it. +### 5.3 `GET /datastore/dump/{resource_id}` + +Full-table download, **one URL → one file** from the caller's point of view. Bytes never sit in API memory — small dumps redirect to GCS, large dumps stream-concat through async httpx. + +Pipeline: + +1. **Resolve cache key** — read `table.modified` from BigQuery, compute `rev = hex(microsec_epoch(modified))`, prefix becomes `dumps///`. +2. **GCS cache lookup** — `list_blobs(prefix=…)`. Non-empty → skip steps 3-5; log `cache HIT`. +3. **Submit `EXPORT DATA`** — Parquet → single-file URI `gs:///.parquet`; CSV/NDJSON → wildcard URI `gs:///_*.` so BigQuery shards >1 GB exports. The SELECT casts `TIMESTAMP` + `DATETIME` columns to ISO 8601 for CSV/NDJSON; Parquet keeps native types. +4. **Poll non-blockingly** — `await asyncio.to_thread(job.reload)` + `await asyncio.sleep(_DUMP_POLL_INTERVAL_SECONDS)` between iterations. Worker thread is held only during the brief `reload` call, not the wait. +5. **GC stale revisions** — after a successful extract, sweep `dumps///` and delete any blob that doesn't start with the current `prefix`. Best-effort, failures logged. Storage stays bounded to one rev per `(rid, fmt)`. +6. **Sign URLs** — V4 signed URLs with `response-content-disposition: attachment; filename="."` (single shard) or `_NN.` (multi-shard, 1-indexed, zero-padded). Signing offloaded to a thread (IAM round-trip under workload identity). +7. **Return**: + - 1 URL → `RedirectResponse(302)`. Bytes flow GCS → client; server is **out of the byte path**. + - N URLs → `StreamingResponse` over `services.dump.stream_*_shards` (async httpx, 64 KiB chunks, serial shard walk, CSV header-dedup via `_skip_first_line`, NDJSON pure byte-concat). + +Per-stream resource profile (multi-shard branch): ~64 KiB resident memory, **0** worker threads, byte-copy CPU only, async cancellation propagates from client disconnect → httpx → GCS connection released. + +Errors: +- Parquet >1 GB → `EXPORT DATA` job fails with a "single URI / wildcard" message; classifier in `_is_export_too_large` flips it to `PayloadTooLargeError` (413). Caller switches to `format=csv` or `format=ndjson`. +- Any other BigQuery / GCS failure → `ServerError` (500) with the upstream message. +- `BIGQUERY_EXPORT_BUCKET` unset → `ServerError` at request time (the lifespan doesn't fail-fast because dump is an optional capability). + +Required IAM. Dump follows a strict **ro for reading, rw for writing/updating** model — see [bigquery/client.py](datastore/infrastructure/engines/bigquery/client.py) `load_credentials` + `_build_bq_client` / `_build_storage_client` on the backend: + +| Step | Identity | Why | +|---|---|---| +| `get_table` | RO BQ (`self.client`) | Reading BigQuery metadata. | +| `list_blobs` cache lookup | RO GCS | Reading GCS objects. | +| `client.query("EXPORT DATA …")` | RW BQ (built on demand) | BigQuery writes shards to GCS under this SA's identity — it's a write op even though the SQL surface is `SELECT`. | +| Post-extract `list_blobs` refresh | RW GCS | Blobs are passed straight to `generate_signed_url` next; we want them bound to the rw client. | +| `delete` (GC) | RW GCS | Writing/deleting objects. | +| `generate_signed_url` | RW GCS | Under workload identity this calls IAM `signBlob`, which typically only the rw SA holds via `iam.serviceAccountTokenCreator`. | + +Concrete perm sets: + +- **RO SA** (`BIGQUERY_CREDENTIALS_RO`) — `bigquery.tables.get` + `storage.objects.list`. +- **RW SA** (`BIGQUERY_CREDENTIALS`) — `bigquery.jobs.create` + `bigquery.tables.export` + `bigquery.tables.getData` + `storage.objects.{create,list,delete}` + `iam.serviceAccountTokenCreator`. + +A single SA works if both perm sets land on the same identity — `BIGQUERY_CREDENTIALS_RO` empty falls through to ADC; same env var can drive both. `_build_bq_client` and `_build_storage_client` on the backend are deliberately small + stub-friendly so tests inject mocks without monkey-patching `google.cloud.*` globally. + +A 24h object-lifecycle rule on the bucket is a useful belt-and-braces: the engine GCs older revs already, but lifecycle catches anything stranded by a crashed dump. + +The GCS client is built with the same credentials as the BigQuery client for the active engine mode (`load_credentials(config, mode)` in [bigquery/client.py](datastore/infrastructure/engines/bigquery/client.py)). Without this shim, a service-account JSON loaded via `BIGQUERY_CREDENTIALS_RO` would drive BigQuery but `storage.Client(...)` would silently fall back to ADC — a near-invisible identity split. Workload identity / `GOOGLE_APPLICATION_CREDENTIALS`-style setups still work because `load_credentials` returns `None` for ADC and the storage client follows the same default-credentials path. + --- ## 6. Request / Response Contracts diff --git a/README.md b/README.md index 701a82d..2bc68d6 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,8 @@ datastore/ │ ├── error_handlers.py # Exception handlers (APIError → CKAN error envelope) │ └── endpoints/ # Route handlers, one file per resource group │ ├── health.py # /, /health, /ready -│ └── datastore.py # /api/3/action/datastore_* +│ ├── datastore.py # /api/3/action/datastore_* +│ └── dump.py # /datastore/dump/ (302 single / stream multi) │ ├── auth/ # Pluggable auth providers — one subpackage per type │ ├── base.py # AuthProvider Protocol + Decision dataclass + @@ -60,7 +61,9 @@ datastore/ │ ├── write.py # create / upsert / delete orchestration │ ├── read.py # search / search_sql orchestration (engine call, │ │ # format dispatch, pagination links) -│ └── streaming.py # per-format byte-yielding writers used by read.py +│ ├── streaming.py # per-format byte-yielding writers used by read.py +│ └── dump.py # multi-shard stream-concat over async httpx +│ # (drives /datastore/dump for >1 GB CSV/NDJSON) │ └── infrastructure/ # Adapters to outside systems ├── cache.py # InMemoryCache + RedisCache (CachePort protocol) @@ -124,7 +127,14 @@ What's shipped and what's next. Tick each box as the change set lands. - `datastore_search_sql` (sqlglot parses tables + functions; per-table CKAN authorize; per-engine function allow-list) - `datastore_info` (column schema + free-form `meta` dict) -- [x] Health endpoints `/`, `/health`, `/ready` returning the CKAN envelope shape. +- [x] `GET /datastore/dump/?format=csv|ndjson|parquet` — full-table download + via BigQuery `EXPORT DATA`. **1 shard** (≤1 GB CSV/NDJSON, or any Parquet ≤1 GB): + 302 to a GCS signed URL (server out of the byte path). **N shards** (>1 GB CSV/NDJSON): + server stream-concats shards via async httpx (~64 KiB peak memory, no threadpool). + Parquet >1 GB returns 413 (parquet shards can't be byte-concatenated). Results are + cached in GCS keyed by `table.modified`; unchanged tables skip the extract entirely, + and stale revisions are GC'd on the next cache miss so storage stays bounded to one + rev per `(resource_id, format)`. `/ready` builds the rw + ro engine instances during lifespan and probes `engine.healthcheck()` on each — 503 with a `Service Unavailable` envelope if either fails (so k8s pulls the pod from the Service). @@ -225,6 +235,8 @@ Every entry below maps 1:1 to a field on `datastore.core.config.Config`. See [.e | `BIGQUERY_CREDENTIALS` | _(empty)_ | Read-write service-account creds. Accepts a JSON blob (leading `{`), a path to a service-account JSON file, or empty (→ Application Default Credentials). | | `BIGQUERY_CREDENTIALS_RO` | _(empty)_ | Read-only service-account creds (same format). Empty → falls back to `BIGQUERY_CREDENTIALS` so single-credential deployments work. | | `BIGQUERY_USE_QUERY_CACHE` | `true` | Use BigQuery's 24h query-results cache on `datastore_search` / `datastore_search_sql` / `datastore_info`. Identical SELECTs return free + fast on cache hits. Set `false` to force a fresh scan. | +| `BIGQUERY_EXPORT_BUCKET` | _(empty)_ | GCS bucket name (no `gs://` prefix) that `/datastore/dump/` writes `EXPORT DATA` shards into. Required when the dump endpoint is in use. **Credential model: ro reads, rw writes.** RO SA (`BIGQUERY_CREDENTIALS_RO`) does the BigQuery `get_table` and the initial GCS `list_blobs` cache lookup. RW SA (`BIGQUERY_CREDENTIALS`) runs `EXPORT DATA` (it writes shards under its own identity), does GC `delete`, and signs URLs. **RO SA perms:** `bigquery.tables.get` + `storage.objects.list`. **RW SA perms:** `bigquery.jobs.create` + `bigquery.tables.export` + `bigquery.tables.getData` + `storage.objects.{create,list,delete}` + `iam.serviceAccountTokenCreator` (for V4 signing under workload identity). A 24h object-lifecycle rule on the bucket is recommended as a safety net. | +| `BIGQUERY_EXPORT_URL_EXPIRY_HOURS` | `1` | Signed-URL TTL for dump manifest entries (hours). | | `REDIS_URL` | _(empty)_ | Redis URL for cache; empty → in-process `InMemoryCache` | | `CKAN_URL` | _(empty)_ | Base URL of the CKAN instance (required when `AUTH_TYPE=ckan`) | | `HTTP_TIMEOUT_SECONDS` | `10` | Timeout for outbound CKAN calls (seconds) | diff --git a/datastore/api/endpoints/dump.py b/datastore/api/endpoints/dump.py new file mode 100644 index 0000000..1da8eec --- /dev/null +++ b/datastore/api/endpoints/dump.py @@ -0,0 +1,67 @@ +"""`GET /datastore/dump/{resource_id}` — single download for a table. + +Behaviour by shard count (decided by BigQuery from the export size): + + - **1 shard** (≤ 1 GB, or any-size Parquet): 302 redirect to the + GCS signed URL. Zero server bandwidth — bytes go GCS → client. + - **N shards** (>1 GB CSV/NDJSON): `StreamingResponse` over + `services.dump.stream_*_shards`, which pulls each shard from GCS + via async httpx and byte-forwards (CSV header-dedup; NDJSON pure + concat). Memory ≈ one chunk in flight; no threadpool consumption. + +Parquet >1 GB is refused upstream with 413 (parquet shards can't be +byte-concatenated). Caller picks CSV/NDJSON. +""" + +from __future__ import annotations + +from typing import Annotated, Literal + +from fastapi import APIRouter, Query +from starlette.responses import RedirectResponse, StreamingResponse + +from datastore.api.context import Context +from datastore.infrastructure.engines import get_datastore_engine +from datastore.services.dump import stream_csv_shards, stream_ndjson_shards + +DumpFormat = Literal["csv", "ndjson", "parquet"] + +_MEDIA_TYPE: dict[str, str] = { + "csv": "text/csv", + "ndjson": "application/x-ndjson", + "parquet": "application/vnd.apache.parquet", +} + +router = APIRouter(tags=["dump"]) + + +@router.get("/datastore/dump/{resource_id}") +async def dump( + context: Context, + resource_id: str, + fmt: Annotated[DumpFormat, Query(alias="format")] = "csv", +): + await context.authorize(resource_id=resource_id, permission="read") + engine = get_datastore_engine(context, mode="ro") + + urls = await engine.dump(resource_id, fmt) + + if len(urls) == 1: + return RedirectResponse(url=urls[0], status_code=302) + + if fmt == "csv": + body = stream_csv_shards(urls) + elif fmt == "ndjson": + body = stream_ndjson_shards(urls) + else: # pragma: no cover — Parquet never returns >1 shard + raise RuntimeError(f"unexpected multi-shard format: {fmt}") + + return StreamingResponse( + body, + media_type=_MEDIA_TYPE[fmt], + headers={ + "Content-Disposition": ( + f'attachment; filename="{resource_id}.{fmt}"' + ), + }, + ) diff --git a/datastore/api/routes.py b/datastore/api/routes.py index da46d21..36ddac9 100644 --- a/datastore/api/routes.py +++ b/datastore/api/routes.py @@ -2,10 +2,11 @@ from fastapi import APIRouter -from datastore.api.endpoints import datastore, health +from datastore.api.endpoints import datastore, dump, health api_router = APIRouter() api_router.include_router(health.welcome_router) api_router.include_router(health.probe_router) api_router.include_router(health.probe_router, prefix="/api/3/action") -api_router.include_router(datastore.router, prefix="/api/3/action") \ No newline at end of file +api_router.include_router(datastore.router, prefix="/api/3/action") +api_router.include_router(dump.router) \ No newline at end of file diff --git a/datastore/core/config.py b/datastore/core/config.py index 788f0e6..1854f9e 100644 --- a/datastore/core/config.py +++ b/datastore/core/config.py @@ -129,6 +129,20 @@ def _check_engine_available(cls, v: str) -> str: "force a fresh scan in tests." ), ) + BIGQUERY_EXPORT_BUCKET: str = Field( + default="", + description=( + "GCS bucket name (no `gs://` prefix) that `/datastore/dump/` " + ), + ) + BIGQUERY_EXPORT_URL_EXPIRY_HOURS: int = Field( + default=1, + ge=1, + le=168, + description=( + "Signed-URL TTL for dump manifest entries (hours). Defaults to 1h." + ), + ) # Per-row system columns INCLUDE_UPDATED_AT: bool = Field( diff --git a/datastore/core/exceptions.py b/datastore/core/exceptions.py index 9636ffe..09e6ef4 100644 --- a/datastore/core/exceptions.py +++ b/datastore/core/exceptions.py @@ -38,6 +38,17 @@ class ConflictError(APIError): type_label = "Conflict Error" +class PayloadTooLargeError(APIError): + """Raised when `/datastore/dump/?format=parquet` exceeds + BigQuery's 1 GB single-file limit. CSV / NDJSON dumps stitch + multiple shards into one download, but Parquet shards can't be + byte-concatenated (each shard has its own footer), so big tables + have to use CSV / NDJSON instead.""" + + status_code = 413 + type_label = "Payload Too Large" + + class ServerError(APIError): status_code = 500 type_label = "Internal Error" @@ -50,7 +61,7 @@ class ServerError(APIError): 404: "Not Found Error", 405: "Not Found Error", 409: "Conflict Error", - 413: "Validation Error", + 413: "Payload Too Large", 422: "Validation Error", 501: "Not Implemented", } diff --git a/datastore/infrastructure/engines/base.py b/datastore/infrastructure/engines/base.py index 9949ff4..7e1fc61 100644 --- a/datastore/infrastructure/engines/base.py +++ b/datastore/infrastructure/engines/base.py @@ -170,6 +170,11 @@ def delete( def info(self, resource_id: str) -> InfoResult: """Return table metadata: column schema + free-form `meta` dict.""" + @abstractmethod + async def dump(self, resource_id: str, fmt: str) -> list[str]: + """Download a table as CSV/NDJSON/Parquet. + """ + @abstractmethod def get_columns(self, resource_id: str) -> list[str]: """Return column names for a table (needed for full-text search across all columns).""" diff --git a/datastore/infrastructure/engines/bigquery/backend.py b/datastore/infrastructure/engines/bigquery/backend.py index 1c6cfa6..1b7f931 100644 --- a/datastore/infrastructure/engines/bigquery/backend.py +++ b/datastore/infrastructure/engines/bigquery/backend.py @@ -23,6 +23,7 @@ from datastore.core.config import Config from datastore.core.exceptions import ( NotFoundError, + PayloadTooLargeError, ServerError, ValidationError, ) @@ -1041,6 +1042,229 @@ def _count_rows(self, resource_id: str) -> int: return 0 return int(rows[0]["n"]) + async def dump(self, resource_id: str, fmt: str) -> list[str]: + """Submit `EXPORT DATA`; poll non-blockingly; return signed URLs. + + - CSV/NDJSON: wildcard URI → BigQuery shards above 1 GB. + - Parquet: single-file URI; >1 GB → 413, switch format. + - Cache key = `table.modified`; unchanged tables skip the extract. + - Older revisions are GC'd on cache miss. + - All BQ + GCS calls are offloaded via `asyncio.to_thread`; the + poll loop releases the worker between `job.reload` calls. + """ + import asyncio + from datetime import timedelta + from uuid import uuid4 + + if self.client is None: + return [] + + bucket = ( + getattr(self.config, "BIGQUERY_EXPORT_BUCKET", "") or "" + ).strip() + if not bucket: + raise ServerError( + "BIGQUERY_EXPORT_BUCKET is not configured — " + "/datastore/dump cannot run without an export bucket." + ) + + if self.metadata is not None and self.metadata.get(resource_id) is None: + raise NotFoundError( + f"resource {resource_id!r} is not declared; nothing to dump" + ) + + from google.cloud import bigquery + + # Clients: ro for reads (BQ get_table, GCS list); rw for the + # rest (BQ EXPORT DATA writes shards under its identity; GCS + # delete + sign). One bucket handle per client. + rw_bq = self._build_bq_client("rw") + ro_gcs = self._build_storage_client("ro").bucket(bucket) + rw_gcs = self._build_storage_client("rw").bucket(bucket) + + table_ref = bigquery.TableReference.from_string( + f"{self.config.BIGQUERY_PROJECT}" + f".{self.config.BIGQUERY_DATASET}.{resource_id}" + ) + try: + table = await asyncio.to_thread(self.client.get_table, table_ref) + except Exception as e: + raise ServerError( + f"BigQuery get_table failed for resource {resource_id!r}: {e}" + ) from e + + rev = ( + f"{int(table.modified.timestamp() * 1_000_000):x}" + if table.modified is not None + else uuid4().hex[:12] + ) + ext = _FMT[fmt]["ext"] + prefix = f"dumps/{resource_id}/{fmt}/{rev}" + uri = ( + f"gs://{bucket}/{prefix}.{ext}" + if fmt == "parquet" + else f"gs://{bucket}/{prefix}_*.{ext}" + ) + + async def _list(b: Any, p: str) -> list[Any]: + return sorted( + await asyncio.to_thread(lambda: list(b.list_blobs(prefix=p))), + key=lambda x: x.name, + ) + + blobs = await _list(ro_gcs, prefix) + + if not blobs: + # `header=true` is the documented default for CSV but some + # client versions / project configs treat it as false; be + # explicit so the column names always land in shard 0. + # NDJSON / Parquet ignore the option. + extra_opts = ", header=true" if fmt == "csv" else "" + sql = ( + f"EXPORT DATA OPTIONS(" + f"uri='{uri}', format='{_FMT[fmt]['bq']}', overwrite=true" + f"{extra_opts}" + ") AS " + f"SELECT {_build_export_select(table.schema, fmt)} FROM " + f"`{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}` " + f"ORDER BY `_id`" + ) + try: + job = await asyncio.to_thread(rw_bq.query, sql) + except Exception as e: + raise ServerError( + f"BigQuery EXPORT DATA submit failed for resource " + f"{resource_id!r}: {e}" + ) from e + + while True: + await asyncio.to_thread(job.reload) + if job.state == "DONE": + break + await asyncio.sleep(_DUMP_POLL_INTERVAL_SECONDS) + + if job.error_result: + err_msg = (job.error_result or {}).get("message", "") + if _is_export_too_large(RuntimeError(err_msg)): + raise PayloadTooLargeError( + f"resource {resource_id!r} exceeds 1 GB after export " + f"as {fmt!r}; single-file download isn't possible. " + "Try `format=csv` or `format=ndjson` for sharded " + "multi-file downloads instead." + ) + raise ServerError( + f"BigQuery EXPORT DATA failed for resource " + f"{resource_id!r}: {err_msg}" + ) + + log.info( + "BigQuery dump cache MISS: resource=%s format=%s rev=%s", + resource_id, fmt, rev, + ) + blobs = await _list(rw_gcs, prefix) + if not blobs: + raise ServerError( + f"BigQuery EXPORT DATA wrote no shards for resource " + f"{resource_id!r}; check job logs." + ) + + # GC stale revisions under dumps///. Best-effort. + def _gc() -> int: + deleted = 0 + for old in rw_gcs.list_blobs(prefix=f"dumps/{resource_id}/{fmt}/"): + if old.name.startswith(prefix): + continue + try: + old.delete() + deleted += 1 + except Exception as gc_err: # noqa: BLE001 + log.warning("dump GC: failed to delete %s: %s", old.name, gc_err) + return deleted + + try: + gc_count = await asyncio.to_thread(_gc) + if gc_count: + log.info( + "BigQuery dump GC: resource=%s format=%s removed=%d", + resource_id, fmt, gc_count, + ) + except Exception as gc_err: # noqa: BLE001 + log.warning( + "BigQuery dump GC failed for resource=%s format=%s: %s", + resource_id, fmt, gc_err, + ) + else: + log.info( + "BigQuery dump cache HIT: resource=%s format=%s rev=%s shards=%d", + resource_id, fmt, rev, len(blobs), + ) + # Re-fetch via rw so the blobs we sign carry rw credentials + # (signing needs IAM signBlob under workload identity). + blobs = await _list(rw_gcs, prefix) + + expiry = timedelta( + hours=getattr(self.config, "BIGQUERY_EXPORT_URL_EXPIRY_HOURS", 1), + ) + + def _sign_all() -> list[str]: + out: list[str] = [] + for i, blob in enumerate(blobs): + filename = ( + f"{resource_id}.{ext}" + if len(blobs) == 1 + else f"{resource_id}_{i + 1:02d}.{ext}" + ) + out.append( + blob.generate_signed_url( + version="v4", + expiration=expiry, + method="GET", + response_disposition=f'attachment; filename="{filename}"', + ) + ) + return out + + return await asyncio.to_thread(_sign_all) + + def _build_bq_client(self, mode: str) -> Any: + """Construct an on-demand BigQuery client for `mode` ("ro" / "rw"). + + Used by the dump path's cache-miss branch to elevate to the rw + SA for `EXPORT DATA` while keeping the rest of the engine on + `self.client`. Tests stub this to inject mocks instead of + patching `google.cloud.bigquery` globally. + """ + from google.cloud import bigquery + + from datastore.infrastructure.engines.bigquery.client import ( + load_credentials, + ) + + creds = load_credentials(self.config, mode=mode) # type: ignore[arg-type] + kwargs: dict[str, Any] = {"project": self.config.BIGQUERY_PROJECT} + if creds is not None: + kwargs["credentials"] = creds + return bigquery.Client(**kwargs) + + def _build_storage_client(self, mode: str) -> Any: + """Construct an on-demand GCS client for `mode` ("ro" / "rw"). + + Lazy import keeps `google-cloud-storage` an optional dep — only + the dump path touches GCS, so test envs without the package + don't need to install it. Tests stub this to inject mocks. + """ + from google.cloud import storage + + from datastore.infrastructure.engines.bigquery.client import ( + load_credentials, + ) + + creds = load_credentials(self.config, mode=mode) # type: ignore[arg-type] + kwargs: dict[str, Any] = {"project": self.config.BIGQUERY_PROJECT} + if creds is not None: + kwargs["credentials"] = creds + return storage.Client(**kwargs) + def get_columns(self, resource_id: str) -> list[str]: """Return column names for a table. @@ -1194,3 +1418,63 @@ def _translate_bigquery_error( "bytes": "string", } + +# --- EXPORT DATA helpers ----------------------------------------------------- + +# Seconds between BigQuery job-status polls during a dump. Each poll +# is a quick metadata HTTP call (~tens of ms); between polls the worker +# thread is released so other requests can run. Bumping this down makes +# small jobs complete faster, bumping it up means fewer reload calls +# per job — 1 s is a safe middle. +_DUMP_POLL_INTERVAL_SECONDS = 1.0 + +# Per-format filename extension + BigQuery EXPORT DATA `format` value. +# BigQuery writes newline-delimited JSON to `.json` files; we keep that +# extension on the GCS object so clients see the file type they expect. +_FMT: dict[str, dict[str, str]] = { + "csv": {"ext": "csv", "bq": "CSV"}, + "ndjson": {"ext": "json", "bq": "JSON"}, + "parquet": {"ext": "parquet", "bq": "PARQUET"}, +} + + +def _build_export_select(schema: Any, fmt: str) -> str: + """SELECT column list for EXPORT DATA. + + Parquet preserves native logical types → `*` is enough. For CSV / + NDJSON, cast TIMESTAMP and DATETIME columns to ISO 8601 (BigQuery's + default text format uses a space separator and `UTC` suffix, which + most clients reject as non-ISO). DATE and TIME already serialise as + ISO and pass through. + """ + if fmt == "parquet": + return "*" + parts: list[str] = [] + for field in schema: + ftype = (field.field_type or "").upper() + if ftype == "TIMESTAMP": + # `%E*S` keeps all fractional seconds; trailing Z marks UTC. + parts.append( + f"FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E*SZ', " + f"`{field.name}`, 'UTC') AS `{field.name}`" + ) + elif ftype == "DATETIME": + parts.append( + f"FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E*S', `{field.name}`) " + f"AS `{field.name}`" + ) + else: + parts.append(f"`{field.name}`") + return ", ".join(parts) + + +def _is_export_too_large(exc: BaseException) -> bool: + """Does this BigQuery error look like ">1 GB single-file rejection"? + + BigQuery's exact wording shifts across SDK versions; both phrasings + we've seen contain `single URI` or `wildcard`. False negatives just + surface as a generic 500 instead of 413 — annoying but not silent. + """ + msg = str(exc).lower() + return "single uri" in msg or "wildcard" in msg + diff --git a/datastore/infrastructure/engines/bigquery/client.py b/datastore/infrastructure/engines/bigquery/client.py index 46d2d90..2e11e7a 100644 --- a/datastore/infrastructure/engines/bigquery/client.py +++ b/datastore/infrastructure/engines/bigquery/client.py @@ -24,22 +24,29 @@ def build_client(config: Config, mode: Mode) -> bigquery.Client: "BIGQUERY_PROJECT is required when DATASTORE_ENGINE=bigquery" ) - # Each mode reads its own credential variable independently. An - # empty RO credential falls through to ADC (Application Default - # Credentials) — never to the RW key, since that would silently - # give read paths write privileges and defeat the credential - # split. + creds = load_credentials(config, mode) + if creds is None: + return bigquery.Client(project=project) + return bigquery.Client(project=project, credentials=creds) + + +def load_credentials(config: Config, mode: Mode = "ro"): + """Resolve service-account credentials for this engine mode. + + Defaults to `ro` — the safer choice. Read-only paths (dump, search) + use this on their own; `build_client` passes `mode` explicitly when + constructing the rw client. An empty RO credential falls through + to ADC; it **never** falls back to the RW key, which would silently + give read paths write privileges and defeat the credential split. + """ creds_raw = ( config.BIGQUERY_CREDENTIALS_RO if mode == "ro" else config.BIGQUERY_CREDENTIALS ).strip() - if not creds_raw: - return bigquery.Client(project=project) - return bigquery.Client( - project=project, credentials=_credentials_from_raw(creds_raw) - ) + return None + return _credentials_from_raw(creds_raw) def _credentials_from_raw(raw: str): diff --git a/datastore/services/dump.py b/datastore/services/dump.py new file mode 100644 index 0000000..c9fb703 --- /dev/null +++ b/datastore/services/dump.py @@ -0,0 +1,84 @@ +"""Service for multi-shard streaming of `/datastore/dump/`. + +Single-shard exports are served as 302 redirects (bytes flow GCS → +client, never through us). When BigQuery shards an export (>1 GB +CSV/NDJSON), the endpoint falls back to **server-side stream-concat +through this module**: we fetch each shard from GCS via async httpx +and forward bytes to the client as a single download. + +Resource profile per active stream-concat dump: + - Memory: one ~64 KiB chunk in flight per active shard (we walk + shards serially, so peak ≈ one chunk). + - CPU: byte forwarding plus a single newline scan per CSV shard to + strip its header. Essentially zero. + - Threads: none — everything runs on the asyncio loop via httpx. + - Network: full dump size through our server (the unavoidable cost + of the "one URL → one file" contract). + +Parquet shards aren't supported here because their footers can't be +byte-concat'd; the engine refuses multi-shard Parquet at 1 GB. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator + +import httpx + +# Read-side network knobs. No total timeout (a multi-GB stream legitimately +# takes minutes); just a generous per-chunk read timeout so a dead GCS +# connection doesn't hang us forever. +_TIMEOUT = httpx.Timeout(connect=10.0, read=60.0, write=10.0, pool=10.0) +# Chunk size we pull from GCS / forward to the client. +_CHUNK_BYTES = 64 * 1024 + + +async def stream_csv_shards(urls: list[str]) -> AsyncIterator[bytes]: + """Stream-concat CSV shards. Header from the first shard only; the + first newline of each subsequent shard is dropped (BigQuery emits + a header row per shard when `header=true`).""" + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + for i, url in enumerate(urls): + async with client.stream("GET", url) as resp: + resp.raise_for_status() + if i == 0: + async for chunk in resp.aiter_bytes(_CHUNK_BYTES): + yield chunk + else: + async for chunk in _skip_first_line( + resp.aiter_bytes(_CHUNK_BYTES) + ): + yield chunk + + +async def stream_ndjson_shards(urls: list[str]) -> AsyncIterator[bytes]: + """Stream-concat NDJSON shards. Each shard is independent + newline-delimited JSON, so pure byte concatenation produces a + valid combined stream.""" + async with httpx.AsyncClient(timeout=_TIMEOUT) as client: + for url in urls: + async with client.stream("GET", url) as resp: + resp.raise_for_status() + async for chunk in resp.aiter_bytes(_CHUNK_BYTES): + yield chunk + + +async def _skip_first_line( + chunks: AsyncIterator[bytes], +) -> AsyncIterator[bytes]: + """Drop bytes up to and including the first `\\n`, then forward + the rest unchanged. Used to strip the duplicate CSV header on + non-first shards. Memory bound: bytes of the header line plus + one chunk.""" + pending = bytearray() + async for chunk in chunks: + pending.extend(chunk) + idx = pending.find(b"\n") + if idx >= 0: + yield bytes(pending[idx + 1:]) + pending.clear() + break + async for chunk in chunks: + yield chunk + + diff --git a/datastore/services/read.py b/datastore/services/read.py index 186bf7c..e261fd7 100644 --- a/datastore/services/read.py +++ b/datastore/services/read.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from collections.abc import Iterator from typing import TYPE_CHECKING, Any from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse @@ -65,7 +66,12 @@ async def search_datastore( ) engine = get_datastore_engine(context, mode="ro") - result = engine.search( + # Off the event loop — `engine.search` submits the BigQuery query + # and fetches the first page. The streaming writer below is a sync + # generator that Starlette runs in its threadpool, so subsequent + # page fetches also happen off the loop. + result = await asyncio.to_thread( + engine.search, resource_id=data_dict["resource_id"], filters=to_json_object(data_dict["filters"]), q=to_str_or_json_object(data_dict["q"]), @@ -144,7 +150,11 @@ async def search_sql_datastore( ) engine = get_datastore_engine(context, mode="ro") - result = engine.search_sql(sql=data_dict["sql"], limit=limit) + # Off the event loop — submitting the query + fetching the first + # page blocks; streaming writer below picks up the rest in threadpool. + result = await asyncio.to_thread( + engine.search_sql, sql=data_dict["sql"], limit=limit, + ) fields, _ = frictionless_schema_to_fields(result.schema) return stream_objects( help_url=request_url, @@ -182,7 +192,9 @@ async def info_datastore( path. """ engine = get_datastore_engine(context, mode="ro") - result = engine.info(resource_id=data_dict["resource_id"]) + result = await asyncio.to_thread( + engine.info, resource_id=data_dict["resource_id"], + ) fields, _ = frictionless_schema_to_fields(result.schema) return DatastoreInfoResponse.Result( meta=result.meta, diff --git a/datastore/services/write.py b/datastore/services/write.py index 5ccfdf3..6203ef1 100644 --- a/datastore/services/write.py +++ b/datastore/services/write.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, Any from datastore.infrastructure.engines import get_datastore_engine @@ -37,9 +38,11 @@ async def create_datastore( else: resource_id = resource - # TODO: placeholder engine call — replace once the real backend lands. engine = get_datastore_engine(context, mode="rw") - write_result = engine.create( + # Off the event loop — BigQuery's sync client would otherwise block + # every other request on this worker for the duration of the call. + write_result = await asyncio.to_thread( + engine.create, resource_id=resource_id, schema=schema, records=records, @@ -67,9 +70,9 @@ async def upsert_datastore( include_records = bool(data_dict.get("include_records", False)) include_total = bool(data_dict.get("include_total", False)) - # TODO: placeholder engine call — replace once the real backend lands. engine = get_datastore_engine(context, mode="rw") - write_result = engine.upsert( + write_result = await asyncio.to_thread( + engine.upsert, resource_id=resource_id, records=records, method=method, @@ -95,7 +98,9 @@ async def delete_datastore( fields = data_dict.get("fields") engine = get_datastore_engine(context, mode="rw") - engine.delete(resource_id=resource_id, filters=filters, fields=fields) + await asyncio.to_thread( + engine.delete, resource_id=resource_id, filters=filters, fields=fields, + ) return DatastoreDeleteResponse.Result( resource_id=resource_id, diff --git a/pyproject.toml b/pyproject.toml index 8f2d5c2..cb08870 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "httptools>=0.6", "sqlglot>=25.0", "pyjwt>=2.8,<3", + "google-cloud-storage>=2.14", ] [project.optional-dependencies] diff --git a/tests/conftest.py b/tests/conftest.py index 59d2f72..bcf0889 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,29 @@ from __future__ import annotations -from collections.abc import Iterator -from typing import Any - -import pytest -from datastore.api.context import get_auth_provider, get_ckan_client -from datastore.auth.ckan import Provider as CKANAuthProvider -from datastore.core.exceptions import AuthorizationError, NotFoundError -from datastore.infrastructure.cache import InMemoryCache -from datastore.main import create_app -from fastapi.testclient import TestClient +import os + +# Neutralise any developer .env before `datastore.main` is imported — +# `create_app()` runs at module load and reads the live process env, so +# fixtures can't intercept BigQuery vars in time. Clearing them here +# keeps the suite hermetic against whatever happens to be in .env. +for _name in ( + "BIGQUERY_PROJECT", "BIGQUERY_DATASET", + "BIGQUERY_CREDENTIALS", "BIGQUERY_CREDENTIALS_RO", + "BIGQUERY_EXPORT_BUCKET", +): + os.environ[_name] = "" +os.environ["BIGQUERY_EXPORT_URL_EXPIRY_HOURS"] = "1" + +from collections.abc import Iterator # noqa: E402 +from typing import Any # noqa: E402 + +import pytest # noqa: E402 +from datastore.api.context import get_auth_provider, get_ckan_client # noqa: E402 +from datastore.auth.ckan import Provider as CKANAuthProvider # noqa: E402 +from datastore.core.exceptions import AuthorizationError, NotFoundError # noqa: E402 +from datastore.infrastructure.cache import InMemoryCache # noqa: E402 +from datastore.main import create_app # noqa: E402 +from fastapi.testclient import TestClient # noqa: E402 @pytest.fixture(autouse=True) @@ -34,8 +48,12 @@ def _isolate_bigquery_env(monkeypatch: pytest.MonkeyPatch) -> None: for name in ( "BIGQUERY_PROJECT", "BIGQUERY_DATASET", "BIGQUERY_CREDENTIALS", "BIGQUERY_CREDENTIALS_RO", + "BIGQUERY_EXPORT_BUCKET", ): monkeypatch.setenv(name, "") + # Pydantic-Settings can't parse "" as int — give the dump-URL TTL a + # valid placeholder so a stray .env doesn't break startup in tests. + monkeypatch.setenv("BIGQUERY_EXPORT_URL_EXPIRY_HOURS", "1") # `Config` and engine instances are lru-cached / module-level # singletons; invalidate so the cleared env actually takes effect. get_config.cache_clear() diff --git a/tests/test_datastore_dump.py b/tests/test_datastore_dump.py new file mode 100644 index 0000000..ee856fb --- /dev/null +++ b/tests/test_datastore_dump.py @@ -0,0 +1,592 @@ +"""Tests for `GET /datastore/dump/{resource_id}`. + +Engine returns a list of signed-URL shards: + - len == 1 → endpoint 302s to the URL (no server bandwidth). + - len > 1 → endpoint stream-concats shards from GCS via async httpx. + +We patch `BigQueryBackend.dump` to control how many "shards" the +engine reports, and patch `httpx.AsyncClient` for the stream-concat +tests so they don't try to fetch real URLs. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +from datastore.core.exceptions import PayloadTooLargeError, ServerError +from datastore.infrastructure.engines.bigquery import BigQueryBackend +from datastore.infrastructure.engines.bigquery.backend import ( + _build_export_select, + _is_export_too_large, +) +from datastore.services.dump import _skip_first_line +from fastapi.testclient import TestClient + +from tests.conftest import FakeCKAN + +DUMP_URL = "/datastore/dump/balancing_auction_results_2025" + + +def _patch_dump(urls_or_exc: list[str] | Exception): + """Patch `BigQueryBackend.dump` to return URLs or raise.""" + async def fake(self: BigQueryBackend, resource_id: str, fmt: str) -> list[str]: + if isinstance(urls_or_exc, Exception): + raise urls_or_exc + return urls_or_exc + return patch.object(BigQueryBackend, "dump", fake) + + +# --- single shard: 302 redirect ------------------------------------------- + + +def test_single_shard_returns_302(client: TestClient) -> None: + url = "https://storage.googleapis.com/bucket/dumps/x/abc.csv?Sig=abc" + with _patch_dump([url]): + response = client.get(DUMP_URL, follow_redirects=False) + assert response.status_code == 302 + assert response.headers["location"] == url + assert response.content == b"" + + +@pytest.mark.parametrize("fmt", ["csv", "ndjson", "parquet"]) +def test_each_format_supports_single_shard_redirect( + fmt: str, client: TestClient, +) -> None: + with _patch_dump([f"https://example/x.{fmt}"]): + response = client.get( + DUMP_URL, params={"format": fmt}, follow_redirects=False, + ) + assert response.status_code == 302 + + +# --- multi-shard: stream-concat ------------------------------------------- + + +def test_multi_shard_csv_stream_concat_dedups_header( + client: TestClient, +) -> None: + """Header from shard 1 only; shards 2..N have their first line + dropped before bytes hit the client.""" + shards = { + "url-1": b"col1,col2\na,1\nb,2\n", + "url-2": b"col1,col2\nc,3\nd,4\n", + "url-3": b"col1,col2\ne,5\n", + } + + with _patch_dump(list(shards.keys())), _patch_httpx_stream(shards): + response = client.get(DUMP_URL, follow_redirects=False) + + assert response.status_code == 200 + assert response.headers["content-type"].startswith("text/csv") + assert response.headers["content-disposition"] == ( + 'attachment; filename="balancing_auction_results_2025.csv"' + ) + # Header once, then all rows in order. + assert response.text.splitlines() == [ + "col1,col2", + "a,1", "b,2", + "c,3", "d,4", + "e,5", + ] + + +def test_multi_shard_ndjson_pure_byte_concat(client: TestClient) -> None: + """Each NDJSON shard is self-contained; bytes concatenate cleanly.""" + shards = { + "url-1": b'{"id":1}\n{"id":2}\n', + "url-2": b'{"id":3}\n', + } + with _patch_dump(list(shards.keys())), _patch_httpx_stream(shards): + response = client.get( + DUMP_URL, params={"format": "ndjson"}, follow_redirects=False, + ) + assert response.status_code == 200 + assert response.headers["content-type"].startswith("application/x-ndjson") + assert response.text == ( + '{"id":1}\n{"id":2}\n' + '{"id":3}\n' + ) + + +# --- error paths ---------------------------------------------------------- + + +def test_too_large_parquet_returns_413(client: TestClient) -> None: + with _patch_dump(PayloadTooLargeError("exceeds 1 GB after parquet export")): + response = client.get(DUMP_URL, params={"format": "parquet"}) + assert response.status_code == 413 + assert response.json()["error"]["__type"] == "Payload Too Large" + + +def test_unknown_format_returns_validation_error(client: TestClient) -> None: + response = client.get(DUMP_URL, params={"format": "xml"}) + assert response.status_code == 400 + assert response.json()["error"]["__type"] == "Validation Error" + + +def test_dump_for_unknown_resource_returns_404(client: TestClient) -> None: + response = client.get("/datastore/dump/missing-resource") + assert response.status_code == 404 + + +# --- auth ----------------------------------------------------------------- + + +def test_dump_without_api_key_succeeds_when_public( + client: TestClient, fake_ckan: FakeCKAN, +) -> None: + with _patch_dump(["https://example/a.csv?sig=1"]): + client.headers.pop("Authorization", None) + response = client.get(DUMP_URL, follow_redirects=False) + assert response.status_code == 302 + assert fake_ckan.authorize_calls >= 1 + + +def test_dump_with_denied_key_returns_403( + client: TestClient, fake_ckan: FakeCKAN, +) -> None: + fake_ckan.deny("test-token") + response = client.get(DUMP_URL) + assert response.status_code == 403 + + +# --- helpers: ISO date casting -------------------------------------------- + + +def test_build_export_select_iso_casts_timestamp_and_datetime() -> None: + schema = [ + _bq_field("auction_id", "INT64"), + _bq_field("delivery_start", "TIMESTAMP"), + _bq_field("delivery_local", "DATETIME"), + _bq_field("delivery_day", "DATE"), + ] + select = _build_export_select(schema, fmt="csv") + assert ( + "FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%E*SZ', `delivery_start`, 'UTC')" + in select + ) + assert ( + "FORMAT_DATETIME('%Y-%m-%dT%H:%M:%E*S', `delivery_local`)" + in select + ) + assert "`auction_id`" in select + assert "`delivery_day`" in select + + +def test_build_export_select_parquet_returns_star() -> None: + schema = [_bq_field("delivery_start", "TIMESTAMP")] + assert _build_export_select(schema, fmt="parquet") == "*" + + +def _bq_field(name: str, field_type: str) -> Any: + f = MagicMock() + f.name = name + f.field_type = field_type + return f + + +# --- helpers: too-large heuristic ----------------------------------------- + + +@pytest.mark.parametrize("message", [ + "Operation cannot be completed when exporting to a single URI", + "Cannot export more than 1 GB to a single URI; use the wildcard operator", +]) +def test_too_large_marker_is_recognised(message: str) -> None: + assert _is_export_too_large(RuntimeError(message)) is True + + +def test_unrelated_error_is_not_classified_as_too_large() -> None: + assert _is_export_too_large(RuntimeError("auth failed")) is False + + +# --- helpers: CSV header-skip ------------------------------------------- + + +def test_skip_first_line_drops_header_and_forwards_rest() -> None: + """`_skip_first_line` strips up to and including the first `\\n`, + then byte-forwards everything else unchanged.""" + import asyncio + + async def chunks() -> AsyncIterator[bytes]: + yield b"col1,col2\n" + yield b"a,1\n" + yield b"b,2\n" + + async def run() -> bytes: + out = bytearray() + async for chunk in _skip_first_line(chunks()): + out.extend(chunk) + return bytes(out) + + assert asyncio.run(run()) == b"a,1\nb,2\n" + + +def test_skip_first_line_handles_header_split_across_chunks() -> None: + """The newline may not arrive in the first chunk — verify the + buffer accumulates until the newline is found.""" + import asyncio + + async def chunks() -> AsyncIterator[bytes]: + yield b"col1," # no newline yet + yield b"col2\n" + yield b"row,1\n" + + async def run() -> bytes: + out = bytearray() + async for chunk in _skip_first_line(chunks()): + out.extend(chunk) + return bytes(out) + + assert asyncio.run(run()) == b"row,1\n" + + +# --- engine: placeholder + bucket-missing guards ------------------------- + + +def test_dump_polling_releases_event_loop_between_reloads() -> None: + """Polling loop should `asyncio.sleep` between `job.reload` calls + so other coroutines on the same loop keep running. Verified by + interleaving a ticker — if the dump call hogged the loop, the + ticker would barely advance during the polls. + + The job is flagged with `error_result` once it transitions to DONE + so `dump()` raises immediately after the polling loop, without + reaching the post-extract GCS read.""" + import asyncio + + # `_engine_with_storage` stubs `google.cloud.storage` and gives the + # backend a real `table.modified` so the pre-extract cache lookup + # (empty here → cache miss → polling branch) doesn't blow up. + backend, storage_client = _engine_with_storage([]) + bucket_obj = storage_client.bucket.return_value + # Cache lookup returns no shards → fall through into the extract / + # poll branch. The post-extract retry is never reached because the + # job ends with an error. + bucket_obj.list_blobs.return_value = [] + + job = MagicMock() + job.state = "PENDING" + reload_calls = 0 + + def fake_reload() -> None: + nonlocal reload_calls + reload_calls += 1 + if reload_calls >= 3: + job.state = "DONE" + # Flag an error so dump raises right after the loop and + # doesn't try to read the GCS shard list. + job.error_result = {"message": "test-only error"} + + job.error_result = None + job.reload = fake_reload + backend.client.query.return_value = job + + # Speed up the test: 50 ms poll interval. + with patch( + "datastore.infrastructure.engines.bigquery.backend" + "._DUMP_POLL_INTERVAL_SECONDS", + 0.05, + ): + async def run() -> int: + ticks = 0 + + async def tick() -> None: + nonlocal ticks + while True: + await asyncio.sleep(0) + ticks += 1 + + ticker = asyncio.create_task(tick()) + try: + with pytest.raises(ServerError, match="test-only error"): + await backend.dump("res-1", "csv") + finally: + ticker.cancel() + try: + await ticker + except asyncio.CancelledError: + pass + return ticks + + ticks = asyncio.run(run()) + # 2 sleeps × 50 ms = 100 ms minimum of loop-yielding time; + # the ticker should rack up far more iterations than the + # number of reload calls if the loop is genuinely free. + assert ticks > reload_calls * 10, ( + f"event loop appears blocked during polling: " + f"only {ticks} ticker passes for {reload_calls} reloads" + ) + + +# --- engine: GCS-backed cache by table.modified -------------------------- + + +def _engine_with_storage(storage_blobs: list[Any]) -> tuple[BigQueryBackend, Any]: + """Build a configured BigQueryBackend whose mocked storage client + returns `storage_blobs` from `list_blobs`. Returns the backend + + the storage Client mock so callers can assert on it. + + Tests below patch `from google.cloud import storage` (the lazy + import inside `dump`) so they don't depend on the real package + being installed in the test env. + """ + import datetime as dt + import sys + import types + + backend = BigQueryBackend(mode="ro") + backend.client = MagicMock() + backend.config = MagicMock() + backend.config.BIGQUERY_PROJECT = "proj-1" + backend.config.BIGQUERY_DATASET = "ds-1" + backend.config.BIGQUERY_EXPORT_BUCKET = "bkt" + backend.config.BIGQUERY_EXPORT_URL_EXPIRY_HOURS = 1 + # Empty creds → load_credentials returns None → storage.Client uses + # ADC (which we've stubbed via sys.modules below). + backend.config.BIGQUERY_CREDENTIALS = "" + backend.config.BIGQUERY_CREDENTIALS_RO = "" + backend.metadata = None + + table = MagicMock() + table.schema = [] + # Stable `modified` → stable cache key across calls. + table.modified = dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc) + backend.client.get_table.return_value = table + + storage_client = MagicMock() + bucket_obj = storage_client.bucket.return_value + bucket_obj.list_blobs.return_value = list(storage_blobs) + + # Stub the lazy `from google.cloud import storage` inside + # `_build_storage_client` so test envs without google-cloud-storage + # still resolve. Both helpers below override the constructor anyway. + fake_module = types.ModuleType("storage") + fake_module.Client = MagicMock(return_value=storage_client) + sys.modules["google.cloud.storage"] = fake_module + + # Inject the same `storage_client` mock for both ro and rw GCS work + # (a single mock keeps test assertions on `list_blobs` / + # `bucket.delete` in one place). Inject `backend.client` as the + # rw BigQuery client so `client.query.return_value = job` + # assertions still drive the cache-miss extract path. + backend._build_storage_client = MagicMock(return_value=storage_client) + backend._build_bq_client = MagicMock(return_value=backend.client) + + return backend, storage_client + + +def _fake_blob(name: str, signed_url: str = "https://signed/x") -> Any: + blob = MagicMock() + blob.name = name + blob.generate_signed_url.return_value = signed_url + return blob + + +def test_dump_cache_hit_skips_extract_job() -> None: + """When GCS already has shards for this `(rid, fmt, table.modified)`, + `dump()` returns signed URLs straight from the cache — no + `client.query` call to BigQuery.""" + import asyncio + + blob = _fake_blob("dumps/res-1/csv/.csv", "https://cached") + backend, _ = _engine_with_storage([blob]) + + urls = asyncio.run(backend.dump("res-1", "csv")) + + assert urls == ["https://cached"] + # No extract job submitted — that's the whole point of caching. + assert backend.client.query.call_count == 0 + + +def test_dump_cache_miss_submits_extract_then_returns_urls() -> None: + """First call to `list_blobs` returns empty (cache miss); + `dump()` submits the extract, then `list_blobs` returns the + written shards on the post-extract retry.""" + import asyncio + + new_blob = _fake_blob("dumps/res-1/csv/_000.csv", "https://fresh") + backend, storage_client = _engine_with_storage([]) + bucket_obj = storage_client.bucket.return_value + # Pre-extract: empty. Post-extract refresh: one shard. GC sweep: + # same one shard (nothing stale to delete on first dump ever). + bucket_obj.list_blobs.side_effect = [[], [new_blob], [new_blob]] + + # Job goes straight to DONE without errors. + job = MagicMock() + job.state = "DONE" + job.error_result = None + backend.client.query.return_value = job + + urls = asyncio.run(backend.dump("res-1", "csv")) + + assert urls == ["https://fresh"] + # Exactly one extract submitted on cache miss. + assert backend.client.query.call_count == 1 + + +def test_dump_cache_miss_deletes_older_revisions() -> None: + """After a successful extract on cache miss, blobs from any older + revision under `dumps///` should be deleted to keep + storage from growing unbounded across table updates. The current + revision's blobs stay.""" + import asyncio + import datetime as dt + + backend, storage_client = _engine_with_storage([]) + bucket_obj = storage_client.bucket.return_value + + # Match the rev that backend.dump() computes from table.modified + # (`_engine_with_storage` sets it to 2026-01-01 UTC). + table_modified = dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc) + rev = f"{int(table_modified.timestamp() * 1_000_000):x}" + new_blob = _fake_blob( + f"dumps/res-1/csv/{rev}_000.csv", "https://fresh" + ) + old_blob_a = _fake_blob("dumps/res-1/csv/oldrev1_000.csv") + old_blob_b = _fake_blob("dumps/res-1/csv/oldrev2_000.csv") + + # Calls in order: + # 1) pre-extract cache lookup (prefix=dumps/.../) → empty + # 2) post-extract refresh (prefix=dumps/.../) → [new] + # 3) GC sweep (prefix=dumps/res-1/csv/) → [new, old_a, old_b] + bucket_obj.list_blobs.side_effect = [ + [], + [new_blob], + [new_blob, old_blob_a, old_blob_b], + ] + + job = MagicMock() + job.state = "DONE" + job.error_result = None + backend.client.query.return_value = job + + urls = asyncio.run(backend.dump("res-1", "csv")) + + assert urls == ["https://fresh"] + # The current revision must not be deleted. + assert new_blob.delete.call_count == 0 + # Both older revisions get cleaned up. + assert old_blob_a.delete.call_count == 1 + assert old_blob_b.delete.call_count == 1 + + +def test_dump_cache_hit_does_not_delete_anything() -> None: + """A cache hit must not trigger GC — there's no new revision to + supersede the existing one.""" + import asyncio + + cached = _fake_blob("dumps/res-1/csv/_000.csv", "https://cached") + backend, _ = _engine_with_storage([cached]) + + urls = asyncio.run(backend.dump("res-1", "csv")) + + assert urls == ["https://cached"] + # No extract → no GC. + assert cached.delete.call_count == 0 + + +def test_dump_cache_key_changes_when_table_modified_advances() -> None: + """Different `table.modified` → different cache prefix → different + `list_blobs(prefix=…)` call. Stale cache from an older revision + can't satisfy a newer request.""" + import asyncio + import datetime as dt + + backend, storage_client = _engine_with_storage([]) + bucket_obj = storage_client.bucket.return_value + + table = backend.client.get_table.return_value + # Each dump-on-cache-hit lists twice: ro lookup + rw re-fetch for + # signing. Both calls must return the same blob. + table.modified = dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc) + first_old = _fake_blob("dumps/res-1/csv/old.csv", "https://old") + bucket_obj.list_blobs.side_effect = [[first_old], [first_old]] + asyncio.run(backend.dump("res-1", "csv")) + # Both calls used the same prefix (the cache-hit rev) — take the + # earlier one to compare against the next dump's prefix. + first_prefix = bucket_obj.list_blobs.call_args_list[0].kwargs["prefix"] + + # Bump the table; new call hits a different prefix. + table.modified = dt.datetime(2026, 2, 1, tzinfo=dt.timezone.utc) + second_new = _fake_blob("dumps/res-1/csv/new.csv", "https://new") + bucket_obj.list_blobs.side_effect = [[second_new], [second_new]] + asyncio.run(backend.dump("res-1", "csv")) + second_prefix = bucket_obj.list_blobs.call_args_list[-2].kwargs["prefix"] + + assert first_prefix != second_prefix, ( + "table.modified change must produce a different cache key" + ) + + +def test_dump_returns_empty_list_in_placeholder_mode() -> None: + import asyncio + + backend = BigQueryBackend(mode="ro") + assert asyncio.run(backend.dump("res-1", "csv")) == [] + + +def test_dump_raises_when_export_bucket_unset() -> None: + import asyncio + + backend = BigQueryBackend(mode="ro") + backend.client = MagicMock() + backend.config = MagicMock() + backend.config.BIGQUERY_PROJECT = "proj-1" + backend.config.BIGQUERY_DATASET = "ds-1" + backend.config.BIGQUERY_EXPORT_BUCKET = "" + + with pytest.raises(ServerError, match="BIGQUERY_EXPORT_BUCKET"): + asyncio.run(backend.dump("res-1", "csv")) + + +# --- test infrastructure -------------------------------------------------- + + +def _patch_httpx_stream(shards: dict[str, bytes]): + """Patch `httpx.AsyncClient.stream` so its async context-manager + returns a fake `Response` whose `aiter_bytes` walks the bytes for + that URL. Lets us drive the stream-concat helpers without hitting + the network.""" + + def make_resp(data: bytes) -> Any: + resp = MagicMock() + resp.raise_for_status = MagicMock(return_value=None) + + async def aiter_bytes(chunk_size: int = 64 * 1024): + # Walk the fixture bytes in `chunk_size` slices so callers + # see real chunk boundaries (matters for the header-skip + # path which may span chunks). + for i in range(0, len(data), chunk_size): + yield data[i:i + chunk_size] + + resp.aiter_bytes = aiter_bytes + return resp + + class FakeStreamCtx: + def __init__(self, url: str) -> None: + self._url = url + + async def __aenter__(self) -> Any: + return make_resp(shards[self._url]) + + async def __aexit__(self, *a: Any) -> None: + pass + + class FakeClient: + async def __aenter__(self) -> "FakeClient": + return self + + async def __aexit__(self, *a: Any) -> None: + pass + + def stream(self, method: str, url: str) -> FakeStreamCtx: + return FakeStreamCtx(url) + + return patch( + "datastore.services.dump.httpx.AsyncClient", + MagicMock(return_value=FakeClient()), + )