From dee8d72397eb50f56a1900ff0b3753f0a672236b Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 21 May 2026 16:33:04 +0200 Subject: [PATCH 1/5] Add UK geography asset resolution --- changelog.d/added/uk-geography-assets.md | 1 + docs/outputs.md | 6 +- docs/regions.md | 10 +- src/policyengine/outputs/__init__.py | 20 ++ .../outputs/constituency_impact.py | 30 +- .../outputs/local_authority_impact.py | 30 +- .../outputs/uk_geography_assets.py | 273 ++++++++++++++++++ tests/test_constituency_impact.py | 44 +++ tests/test_local_authority_impact.py | 47 +++ tests/test_uk_geography_assets.py | 102 +++++++ 10 files changed, 540 insertions(+), 23 deletions(-) create mode 100644 changelog.d/added/uk-geography-assets.md create mode 100644 src/policyengine/outputs/uk_geography_assets.py create mode 100644 tests/test_uk_geography_assets.py diff --git a/changelog.d/added/uk-geography-assets.md b/changelog.d/added/uk-geography-assets.md new file mode 100644 index 00000000..62c8dd5b --- /dev/null +++ b/changelog.d/added/uk-geography-assets.md @@ -0,0 +1 @@ +UK constituency and local-authority output helpers can now resolve standard geography files locally or download them from GCS by default. diff --git a/docs/outputs.md b/docs/outputs.md index 788f45dd..2f2f2437 100644 --- a/docs/outputs.md +++ b/docs/outputs.md @@ -242,7 +242,7 @@ for row in impacts.district_results: ### UK constituencies / local authorities -Constituency and local-authority breakdowns require externally-supplied weight matrices: +Constituency and local-authority breakdowns use externally-maintained weight matrices. The convenience helpers first look for the standard files locally, then download them from the PolicyEngine UK GCS bucket: ```python from policyengine.outputs import compute_uk_constituency_impacts @@ -250,14 +250,12 @@ from policyengine.outputs import compute_uk_constituency_impacts impacts = compute_uk_constituency_impacts( baseline_simulation=baseline, reform_simulation=reform, - weight_matrix_path="parliamentary_constituency_weights.h5", - constituency_csv_path="constituencies_2024.csv", year="2025", ) impacts.constituency_results ``` -`compute_uk_local_authority_impacts` follows the same pattern. See [Regions](regions.md). +`compute_uk_local_authority_impacts` follows the same pattern. Pass explicit paths to bypass the resolver, or set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR` to choose the local lookup and download cache directory. See [Regions](regions.md). ## Writing your own diff --git a/docs/regions.md b/docs/regions.md index 9aa30e3b..70b52a02 100644 --- a/docs/regions.md +++ b/docs/regions.md @@ -48,7 +48,7 @@ for row in impacts.district_results: ## UK parliamentary constituencies -Constituency-level impacts reweight every household to each constituency's demographic profile using a pre-computed weight matrix, so both the weight file and a constituency metadata CSV are required inputs: +Constituency-level impacts reweight every household to each constituency's demographic profile using a pre-computed weight matrix. By default, PolicyEngine looks for the standard constituency files locally and downloads them from the PolicyEngine UK GCS bucket if they are not present: ```python from policyengine.outputs import compute_uk_constituency_impacts @@ -56,13 +56,13 @@ from policyengine.outputs import compute_uk_constituency_impacts impacts = compute_uk_constituency_impacts( baseline_simulation=baseline, reform_simulation=reform, - weight_matrix_path="parliamentary_constituency_weights.h5", - constituency_csv_path="constituencies_2024.csv", year="2025", ) impacts.constituency_results ``` +To force local files, pass `weight_matrix_path` and `constituency_csv_path`. To set a reusable local data directory, set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR`. + ## UK local authorities ```python @@ -71,13 +71,13 @@ from policyengine.outputs import compute_uk_local_authority_impacts impacts = compute_uk_local_authority_impacts( baseline_simulation=baseline, reform_simulation=reform, - weight_matrix_path="local_authority_weights.h5", - local_authority_csv_path="local_authorities_2021.csv", year="2025", ) impacts.local_authority_results ``` +`compute_uk_local_authority_impacts` accepts explicit paths with `weight_matrix_path` and `local_authority_csv_path` when callers need to bypass the default local-first, GCS-fallback resolver. + ## Region registries `pe.us.model.region_registry` and `pe.uk.model.region_registry` enumerate supported sub-national units: diff --git a/src/policyengine/outputs/__init__.py b/src/policyengine/outputs/__init__.py index 00936706..f2f27849 100644 --- a/src/policyengine/outputs/__init__.py +++ b/src/policyengine/outputs/__init__.py @@ -57,6 +57,17 @@ calculate_us_poverty_rates, ) from policyengine.outputs.program_statistics import ProgramStatistics +from policyengine.outputs.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LOCAL_AUTHORITY_ASSET_SPEC, + GCSUKGeographyAssetStrategy, + LocalUKGeographyAssetStrategy, + UKGeographyAssetPaths, + UKGeographyAssetSpec, + UKGeographyAssetStrategy, + default_uk_geography_asset_strategies, + resolve_uk_geography_asset_paths, +) __all__ = [ "Output", @@ -102,4 +113,13 @@ "compute_uk_constituency_impacts", "LocalAuthorityImpact", "compute_uk_local_authority_impacts", + "CONSTITUENCY_ASSET_SPEC", + "LOCAL_AUTHORITY_ASSET_SPEC", + "GCSUKGeographyAssetStrategy", + "LocalUKGeographyAssetStrategy", + "UKGeographyAssetPaths", + "UKGeographyAssetSpec", + "UKGeographyAssetStrategy", + "default_uk_geography_asset_strategies", + "resolve_uk_geography_asset_paths", ] diff --git a/src/policyengine/outputs/constituency_impact.py b/src/policyengine/outputs/constituency_impact.py index 02e1bdfd..3fe1ae4b 100644 --- a/src/policyengine/outputs/constituency_impact.py +++ b/src/policyengine/outputs/constituency_impact.py @@ -5,13 +5,18 @@ that reweights all households to represent that constituency's demographics. """ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Sequence import numpy as np import pandas as pd from pydantic import ConfigDict from policyengine.core import Output +from policyengine.outputs.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + UKGeographyAssetStrategy, + resolve_uk_geography_asset_paths, +) if TYPE_CHECKING: from policyengine.core.simulation import Simulation @@ -100,27 +105,38 @@ def run(self) -> None: def compute_uk_constituency_impacts( baseline_simulation: "Simulation", reform_simulation: "Simulation", - weight_matrix_path: str, - constituency_csv_path: str, + weight_matrix_path: Optional[str] = None, + constituency_csv_path: Optional[str] = None, year: str = "2025", + asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, ) -> ConstituencyImpact: """Compute per-constituency income changes for UK. Args: baseline_simulation: Completed baseline simulation. reform_simulation: Completed reform simulation. - weight_matrix_path: Path to parliamentary_constituency_weights.h5. - constituency_csv_path: Path to constituencies_2024.csv. + weight_matrix_path: Optional path to parliamentary_constituency_weights.h5. + If omitted, standard local paths are checked before downloading from GCS. + constituency_csv_path: Optional path to constituencies_2024.csv. + If omitted, standard local paths are checked before downloading from GCS. year: Year key in the H5 file (default "2025"). + asset_strategies: Optional resolver strategy chain. Defaults to local lookup, + then GCS download. Returns: ConstituencyImpact with constituency_results populated. """ + paths = resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + weight_matrix_path=weight_matrix_path, + lookup_csv_path=constituency_csv_path, + asset_strategies=asset_strategies, + ) impact = ConstituencyImpact.model_construct( baseline_simulation=baseline_simulation, reform_simulation=reform_simulation, - weight_matrix_path=weight_matrix_path, - constituency_csv_path=constituency_csv_path, + weight_matrix_path=paths.weight_matrix_path, + constituency_csv_path=paths.lookup_csv_path, year=year, ) impact.run() diff --git a/src/policyengine/outputs/local_authority_impact.py b/src/policyengine/outputs/local_authority_impact.py index a4850dbf..7b36c17c 100644 --- a/src/policyengine/outputs/local_authority_impact.py +++ b/src/policyengine/outputs/local_authority_impact.py @@ -5,13 +5,18 @@ that reweights all households to represent that local authority's demographics. """ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Sequence import numpy as np import pandas as pd from pydantic import ConfigDict from policyengine.core import Output +from policyengine.outputs.uk_geography_assets import ( + LOCAL_AUTHORITY_ASSET_SPEC, + UKGeographyAssetStrategy, + resolve_uk_geography_asset_paths, +) if TYPE_CHECKING: from policyengine.core.simulation import Simulation @@ -99,27 +104,38 @@ def run(self) -> None: def compute_uk_local_authority_impacts( baseline_simulation: "Simulation", reform_simulation: "Simulation", - weight_matrix_path: str, - local_authority_csv_path: str, + weight_matrix_path: Optional[str] = None, + local_authority_csv_path: Optional[str] = None, year: str = "2025", + asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, ) -> LocalAuthorityImpact: """Compute per-local-authority income changes for UK. Args: baseline_simulation: Completed baseline simulation. reform_simulation: Completed reform simulation. - weight_matrix_path: Path to local_authority_weights.h5. - local_authority_csv_path: Path to local_authorities_2021.csv. + weight_matrix_path: Optional path to local_authority_weights.h5. + If omitted, standard local paths are checked before downloading from GCS. + local_authority_csv_path: Optional path to local_authorities_2021.csv. + If omitted, standard local paths are checked before downloading from GCS. year: Year key in the H5 file (default "2025"). + asset_strategies: Optional resolver strategy chain. Defaults to local lookup, + then GCS download. Returns: LocalAuthorityImpact with local_authority_results populated. """ + paths = resolve_uk_geography_asset_paths( + LOCAL_AUTHORITY_ASSET_SPEC, + weight_matrix_path=weight_matrix_path, + lookup_csv_path=local_authority_csv_path, + asset_strategies=asset_strategies, + ) impact = LocalAuthorityImpact.model_construct( baseline_simulation=baseline_simulation, reform_simulation=reform_simulation, - weight_matrix_path=weight_matrix_path, - local_authority_csv_path=local_authority_csv_path, + weight_matrix_path=paths.weight_matrix_path, + local_authority_csv_path=paths.lookup_csv_path, year=year, ) impact.run() diff --git a/src/policyengine/outputs/uk_geography_assets.py b/src/policyengine/outputs/uk_geography_assets.py new file mode 100644 index 00000000..f66be478 --- /dev/null +++ b/src/policyengine/outputs/uk_geography_assets.py @@ -0,0 +1,273 @@ +"""Resolve UK geography output assets from local files or GCS.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Sequence, Union + +UK_GEOGRAPHY_BUCKET = "policyengine-uk-data-private" +UK_GEOGRAPHY_DATA_DIR_ENV = "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" +POLICYENGINE_DATA_FOLDER_ENV = "POLICYENGINE_DATA_FOLDER" + + +@dataclass(frozen=True) +class UKGeographyAssetSpec: + """The paired files needed to compute one UK geography output.""" + + geography_type: str + weight_matrix_filename: str + lookup_csv_filename: str + bucket: str = UK_GEOGRAPHY_BUCKET + + +@dataclass(frozen=True) +class UKGeographyAssetPaths: + """Resolved local paths for a UK geography output.""" + + weight_matrix_path: str + lookup_csv_path: str + + +CONSTITUENCY_ASSET_SPEC = UKGeographyAssetSpec( + geography_type="constituency", + weight_matrix_filename="parliamentary_constituency_weights.h5", + lookup_csv_filename="constituencies_2024.csv", +) + +LOCAL_AUTHORITY_ASSET_SPEC = UKGeographyAssetSpec( + geography_type="local_authority", + weight_matrix_filename="local_authority_weights.h5", + lookup_csv_filename="local_authorities_2021.csv", +) + + +def _env_path(name: str) -> Optional[Path]: + value = os.environ.get(name) + if not value: + return None + return Path(value).expanduser() + + +def _dedupe_paths(paths: Sequence[Path]) -> list[Path]: + result: list[Path] = [] + seen: set[str] = set() + for path in paths: + key = str(path.resolve()) if path.exists() else str(path.absolute()) + if key in seen: + continue + seen.add(key) + result.append(path) + return result + + +def default_local_search_dirs() -> list[Path]: + """Return directories searched before downloading UK geography assets.""" + candidates = [ + _env_path(UK_GEOGRAPHY_DATA_DIR_ENV), + _env_path(POLICYENGINE_DATA_FOLDER_ENV), + Path(".datasets"), + Path.cwd(), + ] + return _dedupe_paths([path for path in candidates if path is not None]) + + +def default_download_dir() -> Path: + """Return the default GCS download cache for UK geography assets.""" + configured = _env_path(UK_GEOGRAPHY_DATA_DIR_ENV) or _env_path( + POLICYENGINE_DATA_FOLDER_ENV + ) + if configured is not None: + return configured + return Path.home() / ".policyengine" / "uk-geography" + + +class UKGeographyAssetStrategy: + """Base class for UK geography asset resolution strategies.""" + + last_error: Optional[str] = None + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + raise NotImplementedError + + +PathLike = Union[os.PathLike, str] + + +class LocalUKGeographyAssetStrategy(UKGeographyAssetStrategy): + """Resolve geography assets from explicit paths or local search dirs.""" + + def __init__(self, search_dirs: Optional[Sequence[PathLike]] = None): + self.search_dirs = [ + Path(path).expanduser() + for path in ( + search_dirs if search_dirs is not None else default_local_search_dirs() + ) + ] + self.last_error = None + + def _resolve_asset( + self, + *, + explicit_path: Optional[str], + filename: str, + ) -> Optional[Path]: + if explicit_path: + path = Path(explicit_path).expanduser() + if path.is_file(): + return path + + for search_dir in self.search_dirs: + path = search_dir / filename + if path.is_file(): + return path + return None + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + weight_path = self._resolve_asset( + explicit_path=weight_matrix_path, + filename=spec.weight_matrix_filename, + ) + csv_path = self._resolve_asset( + explicit_path=lookup_csv_path, + filename=spec.lookup_csv_filename, + ) + + if weight_path is not None and csv_path is not None: + self.last_error = None + return UKGeographyAssetPaths( + weight_matrix_path=str(weight_path), + lookup_csv_path=str(csv_path), + ) + + missing = [] + if weight_path is None: + missing.append(spec.weight_matrix_filename) + if csv_path is None: + missing.append(spec.lookup_csv_filename) + searched = ", ".join(str(path) for path in self.search_dirs) + self.last_error = ( + "local lookup missing " + + ", ".join(missing) + + f"; searched: {searched or ''}" + ) + return None + + +class GCSUKGeographyAssetStrategy(UKGeographyAssetStrategy): + """Download geography assets from the standard PolicyEngine UK GCS bucket.""" + + def __init__(self, download_dir: Optional[PathLike] = None): + self.download_dir = ( + Path(download_dir).expanduser() + if download_dir is not None + else default_download_dir() + ) + self.last_error = None + + def _download_asset( + self, + *, + bucket: str, + filename: str, + explicit_path: Optional[str], + ) -> Path: + if explicit_path: + path = Path(explicit_path).expanduser() + if path.is_file(): + return path + + target_path = self.download_dir / filename + if target_path.is_file(): + return target_path + + self.download_dir.mkdir(parents=True, exist_ok=True) + + from policyengine_core.tools.google_cloud import download_gcs_file + + downloaded_path = download_gcs_file( + bucket=bucket, + file_path=filename, + local_path=str(target_path), + ) + return Path(downloaded_path).expanduser() + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + try: + resolved_weight_matrix_path = self._download_asset( + bucket=spec.bucket, + filename=spec.weight_matrix_filename, + explicit_path=weight_matrix_path, + ) + resolved_lookup_csv_path = self._download_asset( + bucket=spec.bucket, + filename=spec.lookup_csv_filename, + explicit_path=lookup_csv_path, + ) + except Exception as exc: + self.last_error = f"GCS download failed for {spec.geography_type}: {exc}" + return None + + self.last_error = None + return UKGeographyAssetPaths( + weight_matrix_path=str(resolved_weight_matrix_path), + lookup_csv_path=str(resolved_lookup_csv_path), + ) + + +def default_uk_geography_asset_strategies() -> list[UKGeographyAssetStrategy]: + """Return the default local-first, GCS-fallback strategy chain.""" + return [LocalUKGeographyAssetStrategy(), GCSUKGeographyAssetStrategy()] + + +def resolve_uk_geography_asset_paths( + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, +) -> UKGeographyAssetPaths: + """Resolve required UK geography files with local lookup, then GCS fallback.""" + strategies = ( + list(asset_strategies) + if asset_strategies is not None + else default_uk_geography_asset_strategies() + ) + + errors = [] + for strategy in strategies: + paths = strategy.resolve( + spec, + weight_matrix_path=weight_matrix_path, + lookup_csv_path=lookup_csv_path, + ) + if paths is not None: + return paths + if strategy.last_error: + errors.append(f"{strategy.__class__.__name__}: {strategy.last_error}") + + detail = "; ".join(errors) if errors else "no asset strategies configured" + raise FileNotFoundError( + f"Unable to resolve UK {spec.geography_type} geography assets " + f"({spec.weight_matrix_filename}, {spec.lookup_csv_filename}). {detail}. " + f"Set {UK_GEOGRAPHY_DATA_DIR_ENV} or provide explicit paths." + ) diff --git a/tests/test_constituency_impact.py b/tests/test_constituency_impact.py index f29e24b6..08d47a22 100644 --- a/tests/test_constituency_impact.py +++ b/tests/test_constituency_impact.py @@ -12,6 +12,10 @@ from policyengine.outputs.constituency_impact import ( compute_uk_constituency_impacts, ) +from policyengine.outputs.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LocalUKGeographyAssetStrategy, +) def _make_sim(household_data: dict) -> MagicMock: @@ -148,3 +152,43 @@ def test_relative_change(): abs(impact.constituency_results[0]["relative_household_income_change"] - 0.1) < 1e-6 ) + + +def test_compute_resolves_standard_constituency_assets_from_local_dir(): + """The helper can run without explicit asset paths when standard files exist.""" + baseline = _make_sim( + { + "household_net_income": [100.0, 200.0], + "household_weight": [1.0, 1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [110.0, 220.0], + "household_weight": [1.0, 1.0], + } + ) + + weight_matrix = [[1.0, 0.0], [0.0, 1.0]] + csv_rows = [ + {"code": "C001", "name": "A", "x": 0, "y": 0}, + {"code": "C002", "name": "B", "x": 1, "y": 1}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path = os.path.join(tmpdir, CONSTITUENCY_ASSET_SPEC.weight_matrix_filename) + with h5py.File(h5_path, "w") as f: + f.create_dataset("2025", data=np.array(weight_matrix, dtype=np.float64)) + + csv_path = os.path.join(tmpdir, CONSTITUENCY_ASSET_SPEC.lookup_csv_filename) + pd.DataFrame(csv_rows).to_csv(csv_path, index=False) + + impact = compute_uk_constituency_impacts( + baseline, + reform, + asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[tmpdir])], + ) + + assert impact.weight_matrix_path == h5_path + assert impact.constituency_csv_path == csv_path + assert len(impact.constituency_results) == 2 diff --git a/tests/test_local_authority_impact.py b/tests/test_local_authority_impact.py index a872262e..ba809076 100644 --- a/tests/test_local_authority_impact.py +++ b/tests/test_local_authority_impact.py @@ -12,6 +12,10 @@ from policyengine.outputs.local_authority_impact import ( compute_uk_local_authority_impacts, ) +from policyengine.outputs.uk_geography_assets import ( + LOCAL_AUTHORITY_ASSET_SPEC, + LocalUKGeographyAssetStrategy, +) def _make_sim(household_data: dict) -> MagicMock: @@ -106,3 +110,46 @@ def test_zero_weight_la_skipped(): assert len(impact.local_authority_results) == 1 assert impact.local_authority_results[0]["local_authority_code"] == "LA001" + + +def test_compute_resolves_standard_local_authority_assets_from_local_dir(): + """The helper can run without explicit asset paths when standard files exist.""" + baseline = _make_sim( + { + "household_net_income": [100.0, 200.0], + "household_weight": [1.0, 1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [115.0, 230.0], + "household_weight": [1.0, 1.0], + } + ) + + weight_matrix = [[1.0, 0.0], [0.0, 1.0]] + csv_rows = [ + {"code": "LA001", "name": "A", "x": 0, "y": 0}, + {"code": "LA002", "name": "B", "x": 1, "y": 1}, + ] + + with tempfile.TemporaryDirectory() as tmpdir: + h5_path = os.path.join( + tmpdir, + LOCAL_AUTHORITY_ASSET_SPEC.weight_matrix_filename, + ) + with h5py.File(h5_path, "w") as f: + f.create_dataset("2025", data=np.array(weight_matrix, dtype=np.float64)) + + csv_path = os.path.join(tmpdir, LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename) + pd.DataFrame(csv_rows).to_csv(csv_path, index=False) + + impact = compute_uk_local_authority_impacts( + baseline, + reform, + asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[tmpdir])], + ) + + assert impact.weight_matrix_path == h5_path + assert impact.local_authority_csv_path == csv_path + assert len(impact.local_authority_results) == 2 diff --git a/tests/test_uk_geography_assets.py b/tests/test_uk_geography_assets.py new file mode 100644 index 00000000..7ffb4e2a --- /dev/null +++ b/tests/test_uk_geography_assets.py @@ -0,0 +1,102 @@ +"""Tests for UK geography asset resolution.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from policyengine.outputs.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LOCAL_AUTHORITY_ASSET_SPEC, + GCSUKGeographyAssetStrategy, + LocalUKGeographyAssetStrategy, + resolve_uk_geography_asset_paths, +) + + +def _touch(path: Path) -> None: + path.write_text("test asset") + + +def test_local_strategy_resolves_explicit_paths(tmp_path): + weight_matrix_path = tmp_path / "custom_weights.h5" + lookup_csv_path = tmp_path / "custom_lookup.csv" + _touch(weight_matrix_path) + _touch(lookup_csv_path) + + paths = resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + weight_matrix_path=str(weight_matrix_path), + lookup_csv_path=str(lookup_csv_path), + asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[])], + ) + + assert paths.weight_matrix_path == str(weight_matrix_path) + assert paths.lookup_csv_path == str(lookup_csv_path) + + +def test_local_strategy_resolves_standard_files_from_search_dir(tmp_path): + weight_matrix_path = tmp_path / LOCAL_AUTHORITY_ASSET_SPEC.weight_matrix_filename + lookup_csv_path = tmp_path / LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename + _touch(weight_matrix_path) + _touch(lookup_csv_path) + + paths = resolve_uk_geography_asset_paths( + LOCAL_AUTHORITY_ASSET_SPEC, + asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[tmp_path])], + ) + + assert paths.weight_matrix_path == str(weight_matrix_path) + assert paths.lookup_csv_path == str(lookup_csv_path) + + +def test_gcs_strategy_downloads_missing_standard_files(tmp_path): + download_dir = tmp_path / "downloads" + + def fake_download_gcs_file(*, bucket, file_path, local_path, version=None): + del version + assert bucket == CONSTITUENCY_ASSET_SPEC.bucket + Path(local_path).write_text(file_path) + return local_path + + with patch( + "policyengine_core.tools.google_cloud.download_gcs_file", + side_effect=fake_download_gcs_file, + ) as download_gcs_file: + paths = resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + asset_strategies=[ + LocalUKGeographyAssetStrategy(search_dirs=[tmp_path / "missing"]), + GCSUKGeographyAssetStrategy(download_dir=download_dir), + ], + ) + + assert paths.weight_matrix_path == str( + download_dir / CONSTITUENCY_ASSET_SPEC.weight_matrix_filename + ) + assert paths.lookup_csv_path == str( + download_dir / CONSTITUENCY_ASSET_SPEC.lookup_csv_filename + ) + assert download_gcs_file.call_count == 2 + assert {call.kwargs["file_path"] for call in download_gcs_file.call_args_list} == { + CONSTITUENCY_ASSET_SPEC.weight_matrix_filename, + CONSTITUENCY_ASSET_SPEC.lookup_csv_filename, + } + + +def test_resolver_raises_clear_error_when_no_strategy_succeeds(): + class MissingStrategy(LocalUKGeographyAssetStrategy): + def __init__(self): + super().__init__(search_dirs=[]) + + with pytest.raises(FileNotFoundError) as exc_info: + resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + asset_strategies=[MissingStrategy()], + ) + + message = str(exc_info.value) + assert "Unable to resolve UK constituency geography assets" in message + assert CONSTITUENCY_ASSET_SPEC.weight_matrix_filename in message + assert CONSTITUENCY_ASSET_SPEC.lookup_csv_filename in message + assert "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" in message From aa316adc67a50e6fffb2d367931430048a7d3170 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 21 May 2026 16:59:55 +0200 Subject: [PATCH 2/5] Address UK geography asset review feedback --- src/policyengine/countries/uk/regions.py | 31 ++++--- src/policyengine/data/__init__.py | 1 + src/policyengine/data/uk_geography_assets.py | 29 +++++++ .../outputs/uk_geography_assets.py | 65 +++++++++------ tests/test_constituency_impact.py | 22 +++-- tests/test_local_authority_impact.py | 22 +++-- tests/test_uk_geography_assets.py | 22 +++++ tests/test_uk_regions.py | 83 ++++++++++++++++++- 8 files changed, 221 insertions(+), 54 deletions(-) create mode 100644 src/policyengine/data/__init__.py create mode 100644 src/policyengine/data/uk_geography_assets.py diff --git a/src/policyengine/countries/uk/regions.py b/src/policyengine/countries/uk/regions.py index 32430d48..7aceabd2 100644 --- a/src/policyengine/countries/uk/regions.py +++ b/src/policyengine/countries/uk/regions.py @@ -19,6 +19,11 @@ RowFilterStrategy, WeightReplacementStrategy, ) +from policyengine.data.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LOCAL_AUTHORITY_ASSET_SPEC, + UK_GEOGRAPHY_BUCKET_URI, +) from policyengine.provenance.manifest import resolve_region_dataset_path if TYPE_CHECKING: @@ -26,7 +31,7 @@ logger = logging.getLogger(__name__) -UK_DATA_BUCKET = "gs://policyengine-uk-data-private" +UK_DATA_BUCKET = UK_GEOGRAPHY_BUCKET_URI # UK countries UK_COUNTRIES = { @@ -54,8 +59,8 @@ def _load_constituencies_from_csv() -> list[dict]: try: csv_path = download( - gcs_bucket="policyengine-uk-data-private", - gcs_key="constituencies_2024.csv", + gcs_bucket=CONSTITUENCY_ASSET_SPEC.bucket, + gcs_key=CONSTITUENCY_ASSET_SPEC.lookup_csv_filename, ) import pandas as pd @@ -86,8 +91,8 @@ def _load_local_authorities_from_csv() -> list[dict]: try: csv_path = download( - gcs_bucket="policyengine-uk-data-private", - gcs_key="local_authorities_2021.csv", + gcs_bucket=LOCAL_AUTHORITY_ASSET_SPEC.bucket, + gcs_key=LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename, ) import pandas as pd @@ -159,10 +164,10 @@ def build_uk_region_registry( region_type="constituency", parent_code="uk", scoping_strategy=WeightReplacementStrategy( - weight_matrix_bucket="policyengine-uk-data-private", - weight_matrix_key="parliamentary_constituency_weights.h5", - lookup_csv_bucket="policyengine-uk-data-private", - lookup_csv_key="constituencies_2024.csv", + weight_matrix_bucket=CONSTITUENCY_ASSET_SPEC.bucket, + weight_matrix_key=CONSTITUENCY_ASSET_SPEC.weight_matrix_filename, + lookup_csv_bucket=CONSTITUENCY_ASSET_SPEC.bucket, + lookup_csv_key=CONSTITUENCY_ASSET_SPEC.lookup_csv_filename, region_code=const["code"], ), ) @@ -180,10 +185,10 @@ def build_uk_region_registry( region_type="local_authority", parent_code="uk", scoping_strategy=WeightReplacementStrategy( - weight_matrix_bucket="policyengine-uk-data-private", - weight_matrix_key="local_authority_weights.h5", - lookup_csv_bucket="policyengine-uk-data-private", - lookup_csv_key="local_authorities_2021.csv", + weight_matrix_bucket=LOCAL_AUTHORITY_ASSET_SPEC.bucket, + weight_matrix_key=LOCAL_AUTHORITY_ASSET_SPEC.weight_matrix_filename, + lookup_csv_bucket=LOCAL_AUTHORITY_ASSET_SPEC.bucket, + lookup_csv_key=LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename, region_code=la["code"], ), ) diff --git a/src/policyengine/data/__init__.py b/src/policyengine/data/__init__.py new file mode 100644 index 00000000..35dcb10d --- /dev/null +++ b/src/policyengine/data/__init__.py @@ -0,0 +1 @@ +"""Static metadata and packaged data helpers.""" diff --git a/src/policyengine/data/uk_geography_assets.py b/src/policyengine/data/uk_geography_assets.py new file mode 100644 index 00000000..f26e14e3 --- /dev/null +++ b/src/policyengine/data/uk_geography_assets.py @@ -0,0 +1,29 @@ +"""Canonical UK geography asset metadata.""" + +from dataclasses import dataclass + +UK_GEOGRAPHY_BUCKET = "policyengine-uk-data-private" +UK_GEOGRAPHY_BUCKET_URI = f"gs://{UK_GEOGRAPHY_BUCKET}" + + +@dataclass(frozen=True) +class UKGeographyAssetSpec: + """The paired files needed to compute one UK geography output.""" + + geography_type: str + weight_matrix_filename: str + lookup_csv_filename: str + bucket: str = UK_GEOGRAPHY_BUCKET + + +CONSTITUENCY_ASSET_SPEC = UKGeographyAssetSpec( + geography_type="constituency", + weight_matrix_filename="parliamentary_constituency_weights.h5", + lookup_csv_filename="constituencies_2024.csv", +) + +LOCAL_AUTHORITY_ASSET_SPEC = UKGeographyAssetSpec( + geography_type="local_authority", + weight_matrix_filename="local_authority_weights.h5", + lookup_csv_filename="local_authorities_2021.csv", +) diff --git a/src/policyengine/outputs/uk_geography_assets.py b/src/policyengine/outputs/uk_geography_assets.py index f66be478..b63d551b 100644 --- a/src/policyengine/outputs/uk_geography_assets.py +++ b/src/policyengine/outputs/uk_geography_assets.py @@ -7,19 +7,28 @@ from pathlib import Path from typing import Optional, Sequence, Union -UK_GEOGRAPHY_BUCKET = "policyengine-uk-data-private" -UK_GEOGRAPHY_DATA_DIR_ENV = "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" -POLICYENGINE_DATA_FOLDER_ENV = "POLICYENGINE_DATA_FOLDER" - +from policyengine.data.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LOCAL_AUTHORITY_ASSET_SPEC, + UK_GEOGRAPHY_BUCKET, + UKGeographyAssetSpec, +) -@dataclass(frozen=True) -class UKGeographyAssetSpec: - """The paired files needed to compute one UK geography output.""" +__all__ = [ + "CONSTITUENCY_ASSET_SPEC", + "LOCAL_AUTHORITY_ASSET_SPEC", + "UK_GEOGRAPHY_BUCKET", + "UKGeographyAssetPaths", + "UKGeographyAssetSpec", + "UKGeographyAssetStrategy", + "LocalUKGeographyAssetStrategy", + "GCSUKGeographyAssetStrategy", + "default_uk_geography_asset_strategies", + "resolve_uk_geography_asset_paths", +] - geography_type: str - weight_matrix_filename: str - lookup_csv_filename: str - bucket: str = UK_GEOGRAPHY_BUCKET +UK_GEOGRAPHY_DATA_DIR_ENV = "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" +POLICYENGINE_DATA_FOLDER_ENV = "POLICYENGINE_DATA_FOLDER" @dataclass(frozen=True) @@ -30,19 +39,6 @@ class UKGeographyAssetPaths: lookup_csv_path: str -CONSTITUENCY_ASSET_SPEC = UKGeographyAssetSpec( - geography_type="constituency", - weight_matrix_filename="parliamentary_constituency_weights.h5", - lookup_csv_filename="constituencies_2024.csv", -) - -LOCAL_AUTHORITY_ASSET_SPEC = UKGeographyAssetSpec( - geography_type="local_authority", - weight_matrix_filename="local_authority_weights.h5", - lookup_csv_filename="local_authorities_2021.csv", -) - - def _env_path(name: str) -> Optional[Path]: value = os.environ.get(name) if not value: @@ -83,6 +79,18 @@ def default_download_dir() -> Path: return Path.home() / ".policyengine" / "uk-geography" +def _validate_explicit_path(path: Optional[str], *, asset_label: str) -> None: + if not path: + return + + candidate = Path(path).expanduser() + if not candidate.is_file(): + raise FileNotFoundError( + f"Provided UK geography {asset_label} path does not exist " + f"or is not a file: {candidate}" + ) + + class UKGeographyAssetStrategy: """Base class for UK geography asset resolution strategies.""" @@ -247,6 +255,15 @@ def resolve_uk_geography_asset_paths( asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, ) -> UKGeographyAssetPaths: """Resolve required UK geography files with local lookup, then GCS fallback.""" + _validate_explicit_path( + weight_matrix_path, + asset_label=f"{spec.geography_type} weight matrix", + ) + _validate_explicit_path( + lookup_csv_path, + asset_label=f"{spec.geography_type} lookup CSV", + ) + strategies = ( list(asset_strategies) if asset_strategies is not None diff --git a/tests/test_constituency_impact.py b/tests/test_constituency_impact.py index 08d47a22..99ad3211 100644 --- a/tests/test_constituency_impact.py +++ b/tests/test_constituency_impact.py @@ -2,7 +2,7 @@ import os import tempfile -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import h5py import numpy as np @@ -14,7 +14,6 @@ ) from policyengine.outputs.uk_geography_assets import ( CONSTITUENCY_ASSET_SPEC, - LocalUKGeographyAssetStrategy, ) @@ -154,7 +153,9 @@ def test_relative_change(): ) -def test_compute_resolves_standard_constituency_assets_from_local_dir(): +def test_compute_resolves_standard_constituency_assets_from_default_local_dir( + monkeypatch, +): """The helper can run without explicit asset paths when standard files exist.""" baseline = _make_sim( { @@ -183,11 +184,16 @@ def test_compute_resolves_standard_constituency_assets_from_local_dir(): csv_path = os.path.join(tmpdir, CONSTITUENCY_ASSET_SPEC.lookup_csv_filename) pd.DataFrame(csv_rows).to_csv(csv_path, index=False) - impact = compute_uk_constituency_impacts( - baseline, - reform, - asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[tmpdir])], - ) + monkeypatch.setenv("POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", tmpdir) + with patch( + "policyengine_core.tools.google_cloud.download_gcs_file" + ) as download: + impact = compute_uk_constituency_impacts( + baseline, + reform, + ) + + download.assert_not_called() assert impact.weight_matrix_path == h5_path assert impact.constituency_csv_path == csv_path diff --git a/tests/test_local_authority_impact.py b/tests/test_local_authority_impact.py index ba809076..f0e19fe3 100644 --- a/tests/test_local_authority_impact.py +++ b/tests/test_local_authority_impact.py @@ -2,7 +2,7 @@ import os import tempfile -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import h5py import numpy as np @@ -14,7 +14,6 @@ ) from policyengine.outputs.uk_geography_assets import ( LOCAL_AUTHORITY_ASSET_SPEC, - LocalUKGeographyAssetStrategy, ) @@ -112,7 +111,9 @@ def test_zero_weight_la_skipped(): assert impact.local_authority_results[0]["local_authority_code"] == "LA001" -def test_compute_resolves_standard_local_authority_assets_from_local_dir(): +def test_compute_resolves_standard_local_authority_assets_from_default_local_dir( + monkeypatch, +): """The helper can run without explicit asset paths when standard files exist.""" baseline = _make_sim( { @@ -144,11 +145,16 @@ def test_compute_resolves_standard_local_authority_assets_from_local_dir(): csv_path = os.path.join(tmpdir, LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename) pd.DataFrame(csv_rows).to_csv(csv_path, index=False) - impact = compute_uk_local_authority_impacts( - baseline, - reform, - asset_strategies=[LocalUKGeographyAssetStrategy(search_dirs=[tmpdir])], - ) + monkeypatch.setenv("POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", tmpdir) + with patch( + "policyengine_core.tools.google_cloud.download_gcs_file" + ) as download: + impact = compute_uk_local_authority_impacts( + baseline, + reform, + ) + + download.assert_not_called() assert impact.weight_matrix_path == h5_path assert impact.local_authority_csv_path == csv_path diff --git a/tests/test_uk_geography_assets.py b/tests/test_uk_geography_assets.py index 7ffb4e2a..aa20ee70 100644 --- a/tests/test_uk_geography_assets.py +++ b/tests/test_uk_geography_assets.py @@ -10,6 +10,7 @@ LOCAL_AUTHORITY_ASSET_SPEC, GCSUKGeographyAssetStrategy, LocalUKGeographyAssetStrategy, + UKGeographyAssetStrategy, resolve_uk_geography_asset_paths, ) @@ -84,6 +85,27 @@ def fake_download_gcs_file(*, bucket, file_path, local_path, version=None): } +def test_resolver_rejects_missing_explicit_path_before_strategies(tmp_path): + class ShouldNotRunStrategy(UKGeographyAssetStrategy): + def resolve(self, *args, **kwargs): + raise AssertionError("Strategy should not run for a missing explicit path") + + missing_weight_matrix_path = tmp_path / "missing_weights.h5" + lookup_csv_path = tmp_path / "lookup.csv" + _touch(lookup_csv_path) + + with pytest.raises(FileNotFoundError) as exc_info: + resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + weight_matrix_path=str(missing_weight_matrix_path), + lookup_csv_path=str(lookup_csv_path), + asset_strategies=[ShouldNotRunStrategy()], + ) + + assert "constituency weight matrix" in str(exc_info.value) + assert str(missing_weight_matrix_path) in str(exc_info.value) + + def test_resolver_raises_clear_error_when_no_strategy_succeeds(): class MissingStrategy(LocalUKGeographyAssetStrategy): def __init__(self): diff --git a/tests/test_uk_regions.py b/tests/test_uk_regions.py index 49fb1eb1..09db51b4 100644 --- a/tests/test_uk_regions.py +++ b/tests/test_uk_regions.py @@ -1,11 +1,20 @@ """Tests for UK region definitions.""" -from policyengine.core.scoping_strategy import RowFilterStrategy +from unittest.mock import patch + +from policyengine.core.scoping_strategy import ( + RowFilterStrategy, + WeightReplacementStrategy, +) from policyengine.countries.uk.regions import ( UK_COUNTRIES, build_uk_region_registry, uk_region_registry, ) +from policyengine.data.uk_geography_assets import ( + CONSTITUENCY_ASSET_SPEC, + LOCAL_AUTHORITY_ASSET_SPEC, +) class TestUKCountries: @@ -243,3 +252,75 @@ def test__given_builder__then_accepts_include_local_authorities_flag(self): # Then assert registry is not None assert len(registry.get_by_type("local_authority")) == 0 + + @patch( + "policyengine.countries.uk.regions._load_constituencies_from_csv", + return_value=[{"code": "C001", "name": "Constituency A"}], + ) + def test__given_constituencies_included__then_uses_canonical_assets( + self, + _mock_loader, + ): + """Given: constituencies are included + When: Building the registry + Then: Weight replacement strategy uses canonical constituency assets + """ + # When + registry = build_uk_region_registry(include_constituencies=True) + constituency = registry.get("constituency/C001") + + # Then + assert constituency is not None + assert isinstance(constituency.scoping_strategy, WeightReplacementStrategy) + assert ( + constituency.scoping_strategy.weight_matrix_bucket + == CONSTITUENCY_ASSET_SPEC.bucket + ) + assert ( + constituency.scoping_strategy.weight_matrix_key + == CONSTITUENCY_ASSET_SPEC.weight_matrix_filename + ) + assert ( + constituency.scoping_strategy.lookup_csv_bucket + == CONSTITUENCY_ASSET_SPEC.bucket + ) + assert ( + constituency.scoping_strategy.lookup_csv_key + == CONSTITUENCY_ASSET_SPEC.lookup_csv_filename + ) + + @patch( + "policyengine.countries.uk.regions._load_local_authorities_from_csv", + return_value=[{"code": "LA001", "name": "Local Authority A"}], + ) + def test__given_local_authorities_included__then_uses_canonical_assets( + self, + _mock_loader, + ): + """Given: local authorities are included + When: Building the registry + Then: Weight replacement strategy uses canonical local-authority assets + """ + # When + registry = build_uk_region_registry(include_local_authorities=True) + local_authority = registry.get("local_authority/LA001") + + # Then + assert local_authority is not None + assert isinstance(local_authority.scoping_strategy, WeightReplacementStrategy) + assert ( + local_authority.scoping_strategy.weight_matrix_bucket + == LOCAL_AUTHORITY_ASSET_SPEC.bucket + ) + assert ( + local_authority.scoping_strategy.weight_matrix_key + == LOCAL_AUTHORITY_ASSET_SPEC.weight_matrix_filename + ) + assert ( + local_authority.scoping_strategy.lookup_csv_bucket + == LOCAL_AUTHORITY_ASSET_SPEC.bucket + ) + assert ( + local_authority.scoping_strategy.lookup_csv_key + == LOCAL_AUTHORITY_ASSET_SPEC.lookup_csv_filename + ) From 38feccec5ec3efa44d2de2e9cc1e55b3c2332549 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 21 May 2026 18:45:49 +0200 Subject: [PATCH 3/5] Add local-only UK geography asset option --- docs/outputs.md | 2 +- docs/regions.md | 4 ++-- .../outputs/constituency_impact.py | 8 +++++-- .../outputs/local_authority_impact.py | 8 +++++-- .../outputs/uk_geography_assets.py | 22 ++++++++++++++----- tests/test_uk_geography_assets.py | 20 +++++++++++++++++ 6 files changed, 52 insertions(+), 12 deletions(-) diff --git a/docs/outputs.md b/docs/outputs.md index 2f2f2437..27239ff2 100644 --- a/docs/outputs.md +++ b/docs/outputs.md @@ -255,7 +255,7 @@ impacts = compute_uk_constituency_impacts( impacts.constituency_results ``` -`compute_uk_local_authority_impacts` follows the same pattern. Pass explicit paths to bypass the resolver, or set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR` to choose the local lookup and download cache directory. See [Regions](regions.md). +`compute_uk_local_authority_impacts` follows the same pattern. Pass explicit paths to bypass the resolver; missing explicit paths raise `FileNotFoundError` without falling back to GCS. Pass `download_missing_assets=False` to require the canonical files to exist locally or in the cache. Set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR` to choose the local lookup and download cache directory. See [Regions](regions.md). ## Writing your own diff --git a/docs/regions.md b/docs/regions.md index 70b52a02..06198a43 100644 --- a/docs/regions.md +++ b/docs/regions.md @@ -61,7 +61,7 @@ impacts = compute_uk_constituency_impacts( impacts.constituency_results ``` -To force local files, pass `weight_matrix_path` and `constituency_csv_path`. To set a reusable local data directory, set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR`. +To force specific local files, pass `weight_matrix_path` and `constituency_csv_path`. If either provided path is missing, the helper raises `FileNotFoundError` and does not fall back to GCS. To require the canonical files to be available locally or in the cache, pass `download_missing_assets=False`. To set a reusable local data directory and download cache, set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR`. ## UK local authorities @@ -76,7 +76,7 @@ impacts = compute_uk_local_authority_impacts( impacts.local_authority_results ``` -`compute_uk_local_authority_impacts` accepts explicit paths with `weight_matrix_path` and `local_authority_csv_path` when callers need to bypass the default local-first, GCS-fallback resolver. +`compute_uk_local_authority_impacts` accepts explicit paths with `weight_matrix_path` and `local_authority_csv_path` when callers need to bypass the default local-first, GCS-fallback resolver. It also accepts `download_missing_assets=False` for local-only canonical asset resolution. ## Region registries diff --git a/src/policyengine/outputs/constituency_impact.py b/src/policyengine/outputs/constituency_impact.py index 3fe1ae4b..adde6095 100644 --- a/src/policyengine/outputs/constituency_impact.py +++ b/src/policyengine/outputs/constituency_impact.py @@ -109,6 +109,7 @@ def compute_uk_constituency_impacts( constituency_csv_path: Optional[str] = None, year: str = "2025", asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, + download_missing_assets: bool = True, ) -> ConstituencyImpact: """Compute per-constituency income changes for UK. @@ -120,8 +121,10 @@ def compute_uk_constituency_impacts( constituency_csv_path: Optional path to constituencies_2024.csv. If omitted, standard local paths are checked before downloading from GCS. year: Year key in the H5 file (default "2025"). - asset_strategies: Optional resolver strategy chain. Defaults to local lookup, - then GCS download. + asset_strategies: Optional resolver strategy chain. If omitted, defaults to + local lookup, then optional GCS download. + download_missing_assets: Whether to download canonical missing assets from GCS. + Set to False to require local/cache files. Returns: ConstituencyImpact with constituency_results populated. @@ -131,6 +134,7 @@ def compute_uk_constituency_impacts( weight_matrix_path=weight_matrix_path, lookup_csv_path=constituency_csv_path, asset_strategies=asset_strategies, + download_missing_assets=download_missing_assets, ) impact = ConstituencyImpact.model_construct( baseline_simulation=baseline_simulation, diff --git a/src/policyengine/outputs/local_authority_impact.py b/src/policyengine/outputs/local_authority_impact.py index 7b36c17c..a6aa244a 100644 --- a/src/policyengine/outputs/local_authority_impact.py +++ b/src/policyengine/outputs/local_authority_impact.py @@ -108,6 +108,7 @@ def compute_uk_local_authority_impacts( local_authority_csv_path: Optional[str] = None, year: str = "2025", asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, + download_missing_assets: bool = True, ) -> LocalAuthorityImpact: """Compute per-local-authority income changes for UK. @@ -119,8 +120,10 @@ def compute_uk_local_authority_impacts( local_authority_csv_path: Optional path to local_authorities_2021.csv. If omitted, standard local paths are checked before downloading from GCS. year: Year key in the H5 file (default "2025"). - asset_strategies: Optional resolver strategy chain. Defaults to local lookup, - then GCS download. + asset_strategies: Optional resolver strategy chain. If omitted, defaults to + local lookup, then optional GCS download. + download_missing_assets: Whether to download canonical missing assets from GCS. + Set to False to require local/cache files. Returns: LocalAuthorityImpact with local_authority_results populated. @@ -130,6 +133,7 @@ def compute_uk_local_authority_impacts( weight_matrix_path=weight_matrix_path, lookup_csv_path=local_authority_csv_path, asset_strategies=asset_strategies, + download_missing_assets=download_missing_assets, ) impact = LocalAuthorityImpact.model_construct( baseline_simulation=baseline_simulation, diff --git a/src/policyengine/outputs/uk_geography_assets.py b/src/policyengine/outputs/uk_geography_assets.py index b63d551b..52a64a29 100644 --- a/src/policyengine/outputs/uk_geography_assets.py +++ b/src/policyengine/outputs/uk_geography_assets.py @@ -63,6 +63,7 @@ def default_local_search_dirs() -> list[Path]: candidates = [ _env_path(UK_GEOGRAPHY_DATA_DIR_ENV), _env_path(POLICYENGINE_DATA_FOLDER_ENV), + Path.home() / ".policyengine" / "uk-geography", Path(".datasets"), Path.cwd(), ] @@ -242,9 +243,15 @@ def resolve( ) -def default_uk_geography_asset_strategies() -> list[UKGeographyAssetStrategy]: - """Return the default local-first, GCS-fallback strategy chain.""" - return [LocalUKGeographyAssetStrategy(), GCSUKGeographyAssetStrategy()] +def default_uk_geography_asset_strategies( + *, + download_missing_assets: bool = True, +) -> list[UKGeographyAssetStrategy]: + """Return the default asset resolution strategy chain.""" + strategies: list[UKGeographyAssetStrategy] = [LocalUKGeographyAssetStrategy()] + if download_missing_assets: + strategies.append(GCSUKGeographyAssetStrategy()) + return strategies def resolve_uk_geography_asset_paths( @@ -253,8 +260,9 @@ def resolve_uk_geography_asset_paths( weight_matrix_path: Optional[str] = None, lookup_csv_path: Optional[str] = None, asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, + download_missing_assets: bool = True, ) -> UKGeographyAssetPaths: - """Resolve required UK geography files with local lookup, then GCS fallback.""" + """Resolve required UK geography files with optional GCS fallback.""" _validate_explicit_path( weight_matrix_path, asset_label=f"{spec.geography_type} weight matrix", @@ -267,7 +275,9 @@ def resolve_uk_geography_asset_paths( strategies = ( list(asset_strategies) if asset_strategies is not None - else default_uk_geography_asset_strategies() + else default_uk_geography_asset_strategies( + download_missing_assets=download_missing_assets, + ) ) errors = [] @@ -283,6 +293,8 @@ def resolve_uk_geography_asset_paths( errors.append(f"{strategy.__class__.__name__}: {strategy.last_error}") detail = "; ".join(errors) if errors else "no asset strategies configured" + if not download_missing_assets and asset_strategies is None: + detail += "; GCS fallback disabled by download_missing_assets=False" raise FileNotFoundError( f"Unable to resolve UK {spec.geography_type} geography assets " f"({spec.weight_matrix_filename}, {spec.lookup_csv_filename}). {detail}. " diff --git a/tests/test_uk_geography_assets.py b/tests/test_uk_geography_assets.py index aa20ee70..89f9ed68 100644 --- a/tests/test_uk_geography_assets.py +++ b/tests/test_uk_geography_assets.py @@ -10,6 +10,7 @@ LOCAL_AUTHORITY_ASSET_SPEC, GCSUKGeographyAssetStrategy, LocalUKGeographyAssetStrategy, + UKGeographyAssetSpec, UKGeographyAssetStrategy, resolve_uk_geography_asset_paths, ) @@ -85,6 +86,25 @@ def fake_download_gcs_file(*, bucket, file_path, local_path, version=None): } +def test_resolver_local_only_does_not_download_missing_standard_files(): + spec = UKGeographyAssetSpec( + geography_type="test", + weight_matrix_filename="missing-test-weights.h5", + lookup_csv_filename="missing-test-lookup.csv", + ) + + with patch("policyengine_core.tools.google_cloud.download_gcs_file") as download: + with pytest.raises(FileNotFoundError) as exc_info: + resolve_uk_geography_asset_paths( + spec, + download_missing_assets=False, + ) + + download.assert_not_called() + assert "Unable to resolve UK test geography assets" in str(exc_info.value) + assert spec.weight_matrix_filename in str(exc_info.value) + + def test_resolver_rejects_missing_explicit_path_before_strategies(tmp_path): class ShouldNotRunStrategy(UKGeographyAssetStrategy): def resolve(self, *args, **kwargs): From 137ed72e6a2299b4bbd93642d29ce102dc3e1497 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 21 May 2026 18:55:53 +0200 Subject: [PATCH 4/5] Strengthen UK geography local-only resolution --- docs/outputs.md | 2 +- docs/regions.md | 2 +- .../outputs/uk_geography_assets.py | 17 ++++++++- tests/test_constituency_impact.py | 35 +++++++++++++++++++ tests/test_local_authority_impact.py | 35 +++++++++++++++++++ tests/test_uk_geography_assets.py | 12 +++++++ 6 files changed, 100 insertions(+), 3 deletions(-) diff --git a/docs/outputs.md b/docs/outputs.md index 27239ff2..d2fffbdf 100644 --- a/docs/outputs.md +++ b/docs/outputs.md @@ -255,7 +255,7 @@ impacts = compute_uk_constituency_impacts( impacts.constituency_results ``` -`compute_uk_local_authority_impacts` follows the same pattern. Pass explicit paths to bypass the resolver; missing explicit paths raise `FileNotFoundError` without falling back to GCS. Pass `download_missing_assets=False` to require the canonical files to exist locally or in the cache. Set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR` to choose the local lookup and download cache directory. See [Regions](regions.md). +`compute_uk_local_authority_impacts` follows the same pattern. Pass explicit paths to use specific local files instead of the default local/GCS lookup; missing explicit paths raise `FileNotFoundError` without falling back to GCS. Pass `download_missing_assets=False` to require the canonical files to exist locally or in the cache. Set `POLICYENGINE_UK_GEOGRAPHY_DATA_DIR` to choose the local lookup and download cache directory. See [Regions](regions.md). ## Writing your own diff --git a/docs/regions.md b/docs/regions.md index 06198a43..df76e2bf 100644 --- a/docs/regions.md +++ b/docs/regions.md @@ -76,7 +76,7 @@ impacts = compute_uk_local_authority_impacts( impacts.local_authority_results ``` -`compute_uk_local_authority_impacts` accepts explicit paths with `weight_matrix_path` and `local_authority_csv_path` when callers need to bypass the default local-first, GCS-fallback resolver. It also accepts `download_missing_assets=False` for local-only canonical asset resolution. +`compute_uk_local_authority_impacts` accepts explicit paths with `weight_matrix_path` and `local_authority_csv_path` when callers need to use specific local files instead of the default local/GCS lookup. It also accepts `download_missing_assets=False` for local-only canonical asset resolution. ## Region registries diff --git a/src/policyengine/outputs/uk_geography_assets.py b/src/policyengine/outputs/uk_geography_assets.py index 52a64a29..4369710b 100644 --- a/src/policyengine/outputs/uk_geography_assets.py +++ b/src/policyengine/outputs/uk_geography_assets.py @@ -95,6 +95,7 @@ def _validate_explicit_path(path: Optional[str], *, asset_label: str) -> None: class UKGeographyAssetStrategy: """Base class for UK geography asset resolution strategies.""" + downloads_missing_assets = False last_error: Optional[str] = None def resolve( @@ -179,6 +180,8 @@ def resolve( class GCSUKGeographyAssetStrategy(UKGeographyAssetStrategy): """Download geography assets from the standard PolicyEngine UK GCS bucket.""" + downloads_missing_assets = True + def __init__(self, download_dir: Optional[PathLike] = None): self.download_dir = ( Path(download_dir).expanduser() @@ -279,6 +282,18 @@ def resolve_uk_geography_asset_paths( download_missing_assets=download_missing_assets, ) ) + if not download_missing_assets: + download_strategy_names = [ + strategy.__class__.__name__ + for strategy in strategies + if strategy.downloads_missing_assets + ] + if download_strategy_names: + raise ValueError( + "download_missing_assets=False cannot be used with asset " + "strategies that download missing assets: " + + ", ".join(download_strategy_names) + ) errors = [] for strategy in strategies: @@ -293,7 +308,7 @@ def resolve_uk_geography_asset_paths( errors.append(f"{strategy.__class__.__name__}: {strategy.last_error}") detail = "; ".join(errors) if errors else "no asset strategies configured" - if not download_missing_assets and asset_strategies is None: + if not download_missing_assets: detail += "; GCS fallback disabled by download_missing_assets=False" raise FileNotFoundError( f"Unable to resolve UK {spec.geography_type} geography assets " diff --git a/tests/test_constituency_impact.py b/tests/test_constituency_impact.py index 99ad3211..5ec98e99 100644 --- a/tests/test_constituency_impact.py +++ b/tests/test_constituency_impact.py @@ -7,6 +7,7 @@ import h5py import numpy as np import pandas as pd +import pytest from microdf import MicroDataFrame from policyengine.outputs.constituency_impact import ( @@ -198,3 +199,37 @@ def test_compute_resolves_standard_constituency_assets_from_default_local_dir( assert impact.weight_matrix_path == h5_path assert impact.constituency_csv_path == csv_path assert len(impact.constituency_results) == 2 + + +def test_compute_constituency_impacts_local_only_does_not_call_gcs(tmp_path): + baseline = _make_sim( + { + "household_net_income": [100.0], + "household_weight": [1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [110.0], + "household_weight": [1.0], + } + ) + + with ( + patch( + "policyengine.outputs.uk_geography_assets.default_local_search_dirs", + return_value=[tmp_path / "missing"], + ), + patch("policyengine_core.tools.google_cloud.download_gcs_file") as download, + ): + with pytest.raises(FileNotFoundError) as exc_info: + compute_uk_constituency_impacts( + baseline, + reform, + download_missing_assets=False, + ) + + download.assert_not_called() + assert "GCS fallback disabled by download_missing_assets=False" in str( + exc_info.value + ) diff --git a/tests/test_local_authority_impact.py b/tests/test_local_authority_impact.py index f0e19fe3..a07b893b 100644 --- a/tests/test_local_authority_impact.py +++ b/tests/test_local_authority_impact.py @@ -7,6 +7,7 @@ import h5py import numpy as np import pandas as pd +import pytest from microdf import MicroDataFrame from policyengine.outputs.local_authority_impact import ( @@ -159,3 +160,37 @@ def test_compute_resolves_standard_local_authority_assets_from_default_local_dir assert impact.weight_matrix_path == h5_path assert impact.local_authority_csv_path == csv_path assert len(impact.local_authority_results) == 2 + + +def test_compute_local_authority_impacts_local_only_does_not_call_gcs(tmp_path): + baseline = _make_sim( + { + "household_net_income": [100.0], + "household_weight": [1.0], + } + ) + reform = _make_sim( + { + "household_net_income": [115.0], + "household_weight": [1.0], + } + ) + + with ( + patch( + "policyengine.outputs.uk_geography_assets.default_local_search_dirs", + return_value=[tmp_path / "missing"], + ), + patch("policyengine_core.tools.google_cloud.download_gcs_file") as download, + ): + with pytest.raises(FileNotFoundError) as exc_info: + compute_uk_local_authority_impacts( + baseline, + reform, + download_missing_assets=False, + ) + + download.assert_not_called() + assert "GCS fallback disabled by download_missing_assets=False" in str( + exc_info.value + ) diff --git a/tests/test_uk_geography_assets.py b/tests/test_uk_geography_assets.py index 89f9ed68..3668cd12 100644 --- a/tests/test_uk_geography_assets.py +++ b/tests/test_uk_geography_assets.py @@ -105,6 +105,18 @@ def test_resolver_local_only_does_not_download_missing_standard_files(): assert spec.weight_matrix_filename in str(exc_info.value) +def test_resolver_rejects_downloading_strategy_when_downloads_disabled(tmp_path): + with pytest.raises(ValueError) as exc_info: + resolve_uk_geography_asset_paths( + CONSTITUENCY_ASSET_SPEC, + asset_strategies=[GCSUKGeographyAssetStrategy(download_dir=tmp_path)], + download_missing_assets=False, + ) + + assert "download_missing_assets=False" in str(exc_info.value) + assert "GCSUKGeographyAssetStrategy" in str(exc_info.value) + + def test_resolver_rejects_missing_explicit_path_before_strategies(tmp_path): class ShouldNotRunStrategy(UKGeographyAssetStrategy): def resolve(self, *args, **kwargs): From 99f6cd351b08643de7fa824f4d765fd6ada063cf Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Thu, 21 May 2026 19:09:28 +0200 Subject: [PATCH 5/5] Use UK geography resolver for scoped regions --- src/policyengine/core/scoping_strategy.py | 46 +-- src/policyengine/data/uk_geography_assets.py | 303 ++++++++++++++++- .../outputs/uk_geography_assets.py | 304 +----------------- tests/test_constituency_impact.py | 2 +- tests/test_local_authority_impact.py | 2 +- tests/test_scoping_strategy.py | 100 +++++- 6 files changed, 426 insertions(+), 331 deletions(-) diff --git a/src/policyengine/core/scoping_strategy.py b/src/policyengine/core/scoping_strategy.py index 81778f47..c77b06d5 100644 --- a/src/policyengine/core/scoping_strategy.py +++ b/src/policyengine/core/scoping_strategy.py @@ -6,12 +6,11 @@ a specific value (e.g., UK countries by 'country' field, US places by 'place_fips'). 2. WeightReplacementStrategy: Replaces household weights from a pre-computed weight - matrix stored in GCS (e.g., UK constituencies and local authorities). + matrix resolved locally or from GCS (e.g., UK constituencies and local authorities). """ import logging from abc import abstractmethod -from pathlib import Path from typing import Annotated, Literal, Optional, Union import numpy as np @@ -93,7 +92,7 @@ class WeightReplacementStrategy(RegionScopingStrategy): Used for UK constituencies and local authorities. Instead of removing households, this strategy keeps all households but replaces their weights - with region-specific values from a weight matrix stored in GCS. + with region-specific values from a locally cached or downloaded weight matrix. The weight matrix is an HDF5 file with shape (N_regions x N_households), where each row contains household weights for a specific region. @@ -106,6 +105,7 @@ class WeightReplacementStrategy(RegionScopingStrategy): lookup_csv_bucket: str lookup_csv_key: str region_code: str + download_missing_assets: bool = True def apply( self, @@ -113,35 +113,39 @@ def apply( group_entities: list[str], year: int, ) -> dict[str, MicroDataFrame]: - from policyengine_core.tools.google_cloud import download_gcs_file + from policyengine.data.uk_geography_assets import ( + UKGeographyAssetSpec, + resolve_uk_geography_asset_paths, + ) - # Download lookup CSV and find region index - lookup_path = Path( - download_gcs_file( - bucket=self.lookup_csv_bucket, - file_path=self.lookup_csv_key, - ) + paths = resolve_uk_geography_asset_paths( + UKGeographyAssetSpec( + geography_type="weight replacement", + weight_matrix_filename=self.weight_matrix_key, + lookup_csv_filename=self.lookup_csv_key, + bucket=self.weight_matrix_bucket, + weight_matrix_bucket=self.weight_matrix_bucket, + lookup_csv_bucket=self.lookup_csv_bucket, + ), + download_missing_assets=self.download_missing_assets, ) - lookup_df = pd.read_csv(lookup_path) + + lookup_df = pd.read_csv(paths.lookup_csv_path) region_id = self._find_region_index(lookup_df, self.region_code) - # Download weight matrix and extract weights for this region. + # Load weight matrix and extract weights for this region. # h5py is only needed here, so import lazily to keep # `from policyengine.core import ...` light. import h5py - weights_path = download_gcs_file( - bucket=self.weight_matrix_bucket, - file_path=self.weight_matrix_key, - ) - with h5py.File(weights_path, "r") as f: + with h5py.File(paths.weight_matrix_path, "r") as f: weights = f[str(year)][...] region_weights = weights[region_id] # Validate weight row length matches household count - household_df = pd.DataFrame(entity_data["household"]) + household_df = pd.DataFrame(entity_data["household"]).copy() if len(region_weights) != len(household_df): raise ValueError( f"Weight matrix row length ({len(region_weights)}) does not match " @@ -152,9 +156,9 @@ def apply( # Replace household weights result = {} for entity_name, mdf in entity_data.items(): - df = pd.DataFrame(mdf) + df = pd.DataFrame(mdf).copy() if entity_name == "household": - df["household_weight"] = region_weights + df.loc[:, "household_weight"] = region_weights result[entity_name] = MicroDataFrame(df, weights="household_weight") else: weight_col = f"{entity_name}_weight" @@ -174,7 +178,7 @@ def apply( for hh_id in df[person_hh_col].values ] ) - df[weight_col] = new_weights + df.loc[:, weight_col] = new_weights result[entity_name] = MicroDataFrame( df, diff --git a/src/policyengine/data/uk_geography_assets.py b/src/policyengine/data/uk_geography_assets.py index f26e14e3..14257314 100644 --- a/src/policyengine/data/uk_geography_assets.py +++ b/src/policyengine/data/uk_geography_assets.py @@ -1,9 +1,14 @@ -"""Canonical UK geography asset metadata.""" +"""Canonical UK geography asset metadata and resolution helpers.""" +import os from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Sequence, Union UK_GEOGRAPHY_BUCKET = "policyengine-uk-data-private" UK_GEOGRAPHY_BUCKET_URI = f"gs://{UK_GEOGRAPHY_BUCKET}" +UK_GEOGRAPHY_DATA_DIR_ENV = "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" +POLICYENGINE_DATA_FOLDER_ENV = "POLICYENGINE_DATA_FOLDER" @dataclass(frozen=True) @@ -14,6 +19,24 @@ class UKGeographyAssetSpec: weight_matrix_filename: str lookup_csv_filename: str bucket: str = UK_GEOGRAPHY_BUCKET + weight_matrix_bucket: Optional[str] = None + lookup_csv_bucket: Optional[str] = None + + @property + def resolved_weight_matrix_bucket(self) -> str: + return self.weight_matrix_bucket or self.bucket + + @property + def resolved_lookup_csv_bucket(self) -> str: + return self.lookup_csv_bucket or self.bucket + + +@dataclass(frozen=True) +class UKGeographyAssetPaths: + """Resolved local paths for a UK geography output.""" + + weight_matrix_path: str + lookup_csv_path: str CONSTITUENCY_ASSET_SPEC = UKGeographyAssetSpec( @@ -27,3 +50,281 @@ class UKGeographyAssetSpec: weight_matrix_filename="local_authority_weights.h5", lookup_csv_filename="local_authorities_2021.csv", ) + + +def _env_path(name: str) -> Optional[Path]: + value = os.environ.get(name) + if not value: + return None + return Path(value).expanduser() + + +def _dedupe_paths(paths: Sequence[Path]) -> list[Path]: + result: list[Path] = [] + seen: set[str] = set() + for path in paths: + key = str(path.resolve()) if path.exists() else str(path.absolute()) + if key in seen: + continue + seen.add(key) + result.append(path) + return result + + +def default_local_search_dirs() -> list[Path]: + """Return directories searched before downloading UK geography assets.""" + candidates = [ + _env_path(UK_GEOGRAPHY_DATA_DIR_ENV), + _env_path(POLICYENGINE_DATA_FOLDER_ENV), + Path.home() / ".policyengine" / "uk-geography", + Path(".datasets"), + Path.cwd(), + ] + return _dedupe_paths([path for path in candidates if path is not None]) + + +def default_download_dir() -> Path: + """Return the default GCS download cache for UK geography assets.""" + configured = _env_path(UK_GEOGRAPHY_DATA_DIR_ENV) or _env_path( + POLICYENGINE_DATA_FOLDER_ENV + ) + if configured is not None: + return configured + return Path.home() / ".policyengine" / "uk-geography" + + +def _validate_explicit_path(path: Optional[str], *, asset_label: str) -> None: + if not path: + return + + candidate = Path(path).expanduser() + if not candidate.is_file(): + raise FileNotFoundError( + f"Provided UK geography {asset_label} path does not exist " + f"or is not a file: {candidate}" + ) + + +class UKGeographyAssetStrategy: + """Base class for UK geography asset resolution strategies.""" + + downloads_missing_assets = False + last_error: Optional[str] = None + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + raise NotImplementedError + + +PathLike = Union[os.PathLike, str] + + +class LocalUKGeographyAssetStrategy(UKGeographyAssetStrategy): + """Resolve geography assets from explicit paths or local search dirs.""" + + def __init__(self, search_dirs: Optional[Sequence[PathLike]] = None): + self.search_dirs = [ + Path(path).expanduser() + for path in ( + search_dirs if search_dirs is not None else default_local_search_dirs() + ) + ] + self.last_error = None + + def _resolve_asset( + self, + *, + explicit_path: Optional[str], + filename: str, + ) -> Optional[Path]: + if explicit_path: + path = Path(explicit_path).expanduser() + if path.is_file(): + return path + + for search_dir in self.search_dirs: + path = search_dir / filename + if path.is_file(): + return path + return None + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + weight_path = self._resolve_asset( + explicit_path=weight_matrix_path, + filename=spec.weight_matrix_filename, + ) + csv_path = self._resolve_asset( + explicit_path=lookup_csv_path, + filename=spec.lookup_csv_filename, + ) + + if weight_path is not None and csv_path is not None: + self.last_error = None + return UKGeographyAssetPaths( + weight_matrix_path=str(weight_path), + lookup_csv_path=str(csv_path), + ) + + missing = [] + if weight_path is None: + missing.append(spec.weight_matrix_filename) + if csv_path is None: + missing.append(spec.lookup_csv_filename) + searched = ", ".join(str(path) for path in self.search_dirs) + self.last_error = ( + "local lookup missing " + + ", ".join(missing) + + f"; searched: {searched or ''}" + ) + return None + + +class GCSUKGeographyAssetStrategy(UKGeographyAssetStrategy): + """Download geography assets from the standard PolicyEngine UK GCS bucket.""" + + downloads_missing_assets = True + + def __init__(self, download_dir: Optional[PathLike] = None): + self.download_dir = ( + Path(download_dir).expanduser() + if download_dir is not None + else default_download_dir() + ) + self.last_error = None + + def _download_asset( + self, + *, + bucket: str, + filename: str, + explicit_path: Optional[str], + ) -> Path: + if explicit_path: + path = Path(explicit_path).expanduser() + if path.is_file(): + return path + + target_path = self.download_dir / filename + if target_path.is_file(): + return target_path + + self.download_dir.mkdir(parents=True, exist_ok=True) + + from policyengine_core.tools.google_cloud import download_gcs_file + + downloaded_path = download_gcs_file( + bucket=bucket, + file_path=filename, + local_path=str(target_path), + ) + return Path(downloaded_path).expanduser() + + def resolve( + self, + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + ) -> Optional[UKGeographyAssetPaths]: + try: + resolved_weight_matrix_path = self._download_asset( + bucket=spec.resolved_weight_matrix_bucket, + filename=spec.weight_matrix_filename, + explicit_path=weight_matrix_path, + ) + resolved_lookup_csv_path = self._download_asset( + bucket=spec.resolved_lookup_csv_bucket, + filename=spec.lookup_csv_filename, + explicit_path=lookup_csv_path, + ) + except Exception as exc: + self.last_error = f"GCS download failed for {spec.geography_type}: {exc}" + return None + + self.last_error = None + return UKGeographyAssetPaths( + weight_matrix_path=str(resolved_weight_matrix_path), + lookup_csv_path=str(resolved_lookup_csv_path), + ) + + +def default_uk_geography_asset_strategies( + *, + download_missing_assets: bool = True, +) -> list[UKGeographyAssetStrategy]: + """Return the default asset resolution strategy chain.""" + strategies: list[UKGeographyAssetStrategy] = [LocalUKGeographyAssetStrategy()] + if download_missing_assets: + strategies.append(GCSUKGeographyAssetStrategy()) + return strategies + + +def resolve_uk_geography_asset_paths( + spec: UKGeographyAssetSpec, + *, + weight_matrix_path: Optional[str] = None, + lookup_csv_path: Optional[str] = None, + asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, + download_missing_assets: bool = True, +) -> UKGeographyAssetPaths: + """Resolve required UK geography files with optional GCS fallback.""" + _validate_explicit_path( + weight_matrix_path, + asset_label=f"{spec.geography_type} weight matrix", + ) + _validate_explicit_path( + lookup_csv_path, + asset_label=f"{spec.geography_type} lookup CSV", + ) + + strategies = ( + list(asset_strategies) + if asset_strategies is not None + else default_uk_geography_asset_strategies( + download_missing_assets=download_missing_assets, + ) + ) + if not download_missing_assets: + download_strategy_names = [ + strategy.__class__.__name__ + for strategy in strategies + if strategy.downloads_missing_assets + ] + if download_strategy_names: + raise ValueError( + "download_missing_assets=False cannot be used with asset " + "strategies that download missing assets: " + + ", ".join(download_strategy_names) + ) + + errors = [] + for strategy in strategies: + paths = strategy.resolve( + spec, + weight_matrix_path=weight_matrix_path, + lookup_csv_path=lookup_csv_path, + ) + if paths is not None: + return paths + if strategy.last_error: + errors.append(f"{strategy.__class__.__name__}: {strategy.last_error}") + + detail = "; ".join(errors) if errors else "no asset strategies configured" + if not download_missing_assets: + detail += "; GCS fallback disabled by download_missing_assets=False" + raise FileNotFoundError( + f"Unable to resolve UK {spec.geography_type} geography assets " + f"({spec.weight_matrix_filename}, {spec.lookup_csv_filename}). {detail}. " + f"Set {UK_GEOGRAPHY_DATA_DIR_ENV} or provide explicit paths." + ) diff --git a/src/policyengine/outputs/uk_geography_assets.py b/src/policyengine/outputs/uk_geography_assets.py index 4369710b..55c41052 100644 --- a/src/policyengine/outputs/uk_geography_assets.py +++ b/src/policyengine/outputs/uk_geography_assets.py @@ -1,17 +1,16 @@ -"""Resolve UK geography output assets from local files or GCS.""" - -from __future__ import annotations - -import os -from dataclasses import dataclass -from pathlib import Path -from typing import Optional, Sequence, Union +"""Re-export UK geography asset resolution helpers for output callers.""" from policyengine.data.uk_geography_assets import ( CONSTITUENCY_ASSET_SPEC, LOCAL_AUTHORITY_ASSET_SPEC, UK_GEOGRAPHY_BUCKET, + GCSUKGeographyAssetStrategy, + LocalUKGeographyAssetStrategy, + UKGeographyAssetPaths, UKGeographyAssetSpec, + UKGeographyAssetStrategy, + default_uk_geography_asset_strategies, + resolve_uk_geography_asset_paths, ) __all__ = [ @@ -26,292 +25,3 @@ "default_uk_geography_asset_strategies", "resolve_uk_geography_asset_paths", ] - -UK_GEOGRAPHY_DATA_DIR_ENV = "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR" -POLICYENGINE_DATA_FOLDER_ENV = "POLICYENGINE_DATA_FOLDER" - - -@dataclass(frozen=True) -class UKGeographyAssetPaths: - """Resolved local paths for a UK geography output.""" - - weight_matrix_path: str - lookup_csv_path: str - - -def _env_path(name: str) -> Optional[Path]: - value = os.environ.get(name) - if not value: - return None - return Path(value).expanduser() - - -def _dedupe_paths(paths: Sequence[Path]) -> list[Path]: - result: list[Path] = [] - seen: set[str] = set() - for path in paths: - key = str(path.resolve()) if path.exists() else str(path.absolute()) - if key in seen: - continue - seen.add(key) - result.append(path) - return result - - -def default_local_search_dirs() -> list[Path]: - """Return directories searched before downloading UK geography assets.""" - candidates = [ - _env_path(UK_GEOGRAPHY_DATA_DIR_ENV), - _env_path(POLICYENGINE_DATA_FOLDER_ENV), - Path.home() / ".policyengine" / "uk-geography", - Path(".datasets"), - Path.cwd(), - ] - return _dedupe_paths([path for path in candidates if path is not None]) - - -def default_download_dir() -> Path: - """Return the default GCS download cache for UK geography assets.""" - configured = _env_path(UK_GEOGRAPHY_DATA_DIR_ENV) or _env_path( - POLICYENGINE_DATA_FOLDER_ENV - ) - if configured is not None: - return configured - return Path.home() / ".policyengine" / "uk-geography" - - -def _validate_explicit_path(path: Optional[str], *, asset_label: str) -> None: - if not path: - return - - candidate = Path(path).expanduser() - if not candidate.is_file(): - raise FileNotFoundError( - f"Provided UK geography {asset_label} path does not exist " - f"or is not a file: {candidate}" - ) - - -class UKGeographyAssetStrategy: - """Base class for UK geography asset resolution strategies.""" - - downloads_missing_assets = False - last_error: Optional[str] = None - - def resolve( - self, - spec: UKGeographyAssetSpec, - *, - weight_matrix_path: Optional[str] = None, - lookup_csv_path: Optional[str] = None, - ) -> Optional[UKGeographyAssetPaths]: - raise NotImplementedError - - -PathLike = Union[os.PathLike, str] - - -class LocalUKGeographyAssetStrategy(UKGeographyAssetStrategy): - """Resolve geography assets from explicit paths or local search dirs.""" - - def __init__(self, search_dirs: Optional[Sequence[PathLike]] = None): - self.search_dirs = [ - Path(path).expanduser() - for path in ( - search_dirs if search_dirs is not None else default_local_search_dirs() - ) - ] - self.last_error = None - - def _resolve_asset( - self, - *, - explicit_path: Optional[str], - filename: str, - ) -> Optional[Path]: - if explicit_path: - path = Path(explicit_path).expanduser() - if path.is_file(): - return path - - for search_dir in self.search_dirs: - path = search_dir / filename - if path.is_file(): - return path - return None - - def resolve( - self, - spec: UKGeographyAssetSpec, - *, - weight_matrix_path: Optional[str] = None, - lookup_csv_path: Optional[str] = None, - ) -> Optional[UKGeographyAssetPaths]: - weight_path = self._resolve_asset( - explicit_path=weight_matrix_path, - filename=spec.weight_matrix_filename, - ) - csv_path = self._resolve_asset( - explicit_path=lookup_csv_path, - filename=spec.lookup_csv_filename, - ) - - if weight_path is not None and csv_path is not None: - self.last_error = None - return UKGeographyAssetPaths( - weight_matrix_path=str(weight_path), - lookup_csv_path=str(csv_path), - ) - - missing = [] - if weight_path is None: - missing.append(spec.weight_matrix_filename) - if csv_path is None: - missing.append(spec.lookup_csv_filename) - searched = ", ".join(str(path) for path in self.search_dirs) - self.last_error = ( - "local lookup missing " - + ", ".join(missing) - + f"; searched: {searched or ''}" - ) - return None - - -class GCSUKGeographyAssetStrategy(UKGeographyAssetStrategy): - """Download geography assets from the standard PolicyEngine UK GCS bucket.""" - - downloads_missing_assets = True - - def __init__(self, download_dir: Optional[PathLike] = None): - self.download_dir = ( - Path(download_dir).expanduser() - if download_dir is not None - else default_download_dir() - ) - self.last_error = None - - def _download_asset( - self, - *, - bucket: str, - filename: str, - explicit_path: Optional[str], - ) -> Path: - if explicit_path: - path = Path(explicit_path).expanduser() - if path.is_file(): - return path - - target_path = self.download_dir / filename - if target_path.is_file(): - return target_path - - self.download_dir.mkdir(parents=True, exist_ok=True) - - from policyengine_core.tools.google_cloud import download_gcs_file - - downloaded_path = download_gcs_file( - bucket=bucket, - file_path=filename, - local_path=str(target_path), - ) - return Path(downloaded_path).expanduser() - - def resolve( - self, - spec: UKGeographyAssetSpec, - *, - weight_matrix_path: Optional[str] = None, - lookup_csv_path: Optional[str] = None, - ) -> Optional[UKGeographyAssetPaths]: - try: - resolved_weight_matrix_path = self._download_asset( - bucket=spec.bucket, - filename=spec.weight_matrix_filename, - explicit_path=weight_matrix_path, - ) - resolved_lookup_csv_path = self._download_asset( - bucket=spec.bucket, - filename=spec.lookup_csv_filename, - explicit_path=lookup_csv_path, - ) - except Exception as exc: - self.last_error = f"GCS download failed for {spec.geography_type}: {exc}" - return None - - self.last_error = None - return UKGeographyAssetPaths( - weight_matrix_path=str(resolved_weight_matrix_path), - lookup_csv_path=str(resolved_lookup_csv_path), - ) - - -def default_uk_geography_asset_strategies( - *, - download_missing_assets: bool = True, -) -> list[UKGeographyAssetStrategy]: - """Return the default asset resolution strategy chain.""" - strategies: list[UKGeographyAssetStrategy] = [LocalUKGeographyAssetStrategy()] - if download_missing_assets: - strategies.append(GCSUKGeographyAssetStrategy()) - return strategies - - -def resolve_uk_geography_asset_paths( - spec: UKGeographyAssetSpec, - *, - weight_matrix_path: Optional[str] = None, - lookup_csv_path: Optional[str] = None, - asset_strategies: Optional[Sequence[UKGeographyAssetStrategy]] = None, - download_missing_assets: bool = True, -) -> UKGeographyAssetPaths: - """Resolve required UK geography files with optional GCS fallback.""" - _validate_explicit_path( - weight_matrix_path, - asset_label=f"{spec.geography_type} weight matrix", - ) - _validate_explicit_path( - lookup_csv_path, - asset_label=f"{spec.geography_type} lookup CSV", - ) - - strategies = ( - list(asset_strategies) - if asset_strategies is not None - else default_uk_geography_asset_strategies( - download_missing_assets=download_missing_assets, - ) - ) - if not download_missing_assets: - download_strategy_names = [ - strategy.__class__.__name__ - for strategy in strategies - if strategy.downloads_missing_assets - ] - if download_strategy_names: - raise ValueError( - "download_missing_assets=False cannot be used with asset " - "strategies that download missing assets: " - + ", ".join(download_strategy_names) - ) - - errors = [] - for strategy in strategies: - paths = strategy.resolve( - spec, - weight_matrix_path=weight_matrix_path, - lookup_csv_path=lookup_csv_path, - ) - if paths is not None: - return paths - if strategy.last_error: - errors.append(f"{strategy.__class__.__name__}: {strategy.last_error}") - - detail = "; ".join(errors) if errors else "no asset strategies configured" - if not download_missing_assets: - detail += "; GCS fallback disabled by download_missing_assets=False" - raise FileNotFoundError( - f"Unable to resolve UK {spec.geography_type} geography assets " - f"({spec.weight_matrix_filename}, {spec.lookup_csv_filename}). {detail}. " - f"Set {UK_GEOGRAPHY_DATA_DIR_ENV} or provide explicit paths." - ) diff --git a/tests/test_constituency_impact.py b/tests/test_constituency_impact.py index 5ec98e99..7450d130 100644 --- a/tests/test_constituency_impact.py +++ b/tests/test_constituency_impact.py @@ -217,7 +217,7 @@ def test_compute_constituency_impacts_local_only_does_not_call_gcs(tmp_path): with ( patch( - "policyengine.outputs.uk_geography_assets.default_local_search_dirs", + "policyengine.data.uk_geography_assets.default_local_search_dirs", return_value=[tmp_path / "missing"], ), patch("policyengine_core.tools.google_cloud.download_gcs_file") as download, diff --git a/tests/test_local_authority_impact.py b/tests/test_local_authority_impact.py index a07b893b..f59c2bb2 100644 --- a/tests/test_local_authority_impact.py +++ b/tests/test_local_authority_impact.py @@ -178,7 +178,7 @@ def test_compute_local_authority_impacts_local_only_does_not_call_gcs(tmp_path): with ( patch( - "policyengine.outputs.uk_geography_assets.default_local_search_dirs", + "policyengine.data.uk_geography_assets.default_local_search_dirs", return_value=[tmp_path / "missing"], ), patch("policyengine_core.tools.google_cloud.download_gcs_file") as download, diff --git a/tests/test_scoping_strategy.py b/tests/test_scoping_strategy.py index 334cad1b..3e8d94b9 100644 --- a/tests/test_scoping_strategy.py +++ b/tests/test_scoping_strategy.py @@ -79,8 +79,10 @@ def test__given_weight_replacement__then_serialization_roundtrip_works( @patch("policyengine_core.tools.google_cloud.download_gcs_file") def test__given_weight_replacement__then_apply_replaces_weights( - self, mock_download, uk_test_entity_data, tmp_path + self, mock_download, uk_test_entity_data, tmp_path, monkeypatch ): + monkeypatch.setenv("POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", str(tmp_path)) + # Set up mock lookup CSV lookup_csv_path = tmp_path / "lookup.csv" lookup_df = pd.DataFrame( @@ -108,10 +110,6 @@ def test__given_weight_replacement__then_apply_replaces_weights( ), ) - mock_download.side_effect = lambda bucket, file_path: ( - str(lookup_csv_path) if file_path.endswith(".csv") else str(weights_h5_path) - ) - strategy = WeightReplacementStrategy( weight_matrix_bucket="test-bucket", weight_matrix_key="weights.h5", @@ -138,11 +136,14 @@ def test__given_weight_replacement__then_apply_replaces_weights( # All 6 persons should still be present person_df = pd.DataFrame(result["person"]) assert len(person_df) == 6 + mock_download.assert_not_called() @patch("policyengine_core.tools.google_cloud.download_gcs_file") def test__given_weight_replacement__then_raises_on_dimension_mismatch( - self, mock_download, uk_test_entity_data, tmp_path + self, mock_download, uk_test_entity_data, tmp_path, monkeypatch ): + monkeypatch.setenv("POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", str(tmp_path)) + lookup_csv_path = tmp_path / "lookup.csv" pd.DataFrame({"code": ["R001"], "name": ["Region A"]}).to_csv( lookup_csv_path, index=False @@ -155,10 +156,6 @@ def test__given_weight_replacement__then_raises_on_dimension_mismatch( with h5py.File(weights_h5_path, "w") as f: f.create_dataset("2024", data=np.array([[100.0, 200.0]])) - mock_download.side_effect = lambda bucket, file_path: ( - str(lookup_csv_path) if file_path.endswith(".csv") else str(weights_h5_path) - ) - strategy = WeightReplacementStrategy( weight_matrix_bucket="test-bucket", weight_matrix_key="weights.h5", @@ -173,6 +170,89 @@ def test__given_weight_replacement__then_raises_on_dimension_mismatch( group_entities=["benunit", "household"], year=2024, ) + mock_download.assert_not_called() + + @patch("policyengine_core.tools.google_cloud.download_gcs_file") + def test__given_weight_replacement_local_only_missing_assets__then_does_not_download( + self, mock_download, uk_test_entity_data, tmp_path, monkeypatch + ): + monkeypatch.setenv( + "POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", + str(tmp_path / "missing-assets"), + ) + + strategy = WeightReplacementStrategy( + weight_matrix_bucket="test-bucket", + weight_matrix_key="missing-test-weights.h5", + lookup_csv_bucket="test-bucket", + lookup_csv_key="missing-test-lookup.csv", + region_code="R001", + download_missing_assets=False, + ) + + with pytest.raises(FileNotFoundError, match="GCS fallback disabled"): + strategy.apply( + entity_data=uk_test_entity_data, + group_entities=["benunit", "household"], + year=2024, + ) + mock_download.assert_not_called() + + @patch("policyengine_core.tools.google_cloud.download_gcs_file") + def test__given_weight_replacement_missing_local_assets__then_downloads( + self, mock_download, uk_test_entity_data, tmp_path, monkeypatch + ): + cache_dir = tmp_path / "cache" + monkeypatch.setenv("POLICYENGINE_UK_GEOGRAPHY_DATA_DIR", str(cache_dir)) + + def fake_download_gcs_file(*, bucket, file_path, local_path): + if file_path == "lookup.csv": + assert bucket == "lookup-bucket" + pd.DataFrame( + { + "code": ["R001", "R002"], + "name": ["Region A", "Region B"], + } + ).to_csv(local_path, index=False) + return local_path + + assert bucket == "weights-bucket" + assert file_path == "weights.h5" + import h5py + + with h5py.File(local_path, "w") as f: + f.create_dataset( + "2024", + data=np.array( + [ + [100.0, 200.0, 300.0], + [400.0, 500.0, 600.0], + ] + ), + ) + return local_path + + mock_download.side_effect = fake_download_gcs_file + strategy = WeightReplacementStrategy( + weight_matrix_bucket="weights-bucket", + weight_matrix_key="weights.h5", + lookup_csv_bucket="lookup-bucket", + lookup_csv_key="lookup.csv", + region_code="R002", + ) + + result = strategy.apply( + entity_data=uk_test_entity_data, + group_entities=["benunit", "household"], + year=2024, + ) + + household_df = pd.DataFrame(result["household"]) + np.testing.assert_array_almost_equal( + household_df["household_weight"].values, + [400.0, 500.0, 600.0], + ) + assert mock_download.call_count == 2 def test__given_weight_replacement__then_cache_key_is_descriptive(self): strategy = WeightReplacementStrategy(