diff --git a/compute-feasibility-advisor-proposal.md b/compute-feasibility-advisor-proposal.md new file mode 100644 index 000000000..7ebf70bd9 --- /dev/null +++ b/compute-feasibility-advisor-proposal.md @@ -0,0 +1,281 @@ +# Compute Feasibility Advisor for AutoIntent + +- **Date:** 2026-05-23 +- **Status:** Proposal (pre-implementation) +- **Audience:** AutoIntent maintainers / contributor picking up the task +- **Scope of this document:** technical specification — *what* the advisor estimates and the formulas it uses. Architectural and system-design choices (where the advisor lives in the codebase, how it integrates with the optimizer, the public API surface, file/module layout) are deliberately left to the implementer. + +## Problem + +AutoIntent's main strength is letting a user kick off a full search-space optimization with one call: + +```python +pipeline = Pipeline.from_preset("transformers-heavy") +pipeline.fit(dataset) +``` + +The cost of that convenience is that users — especially those running on a laptop, a single consumer GPU, or a free cloud instance — cannot tell ahead of time whether their hardware can carry the configuration they have just selected. + +Concrete failure cases we see today: + +- `transformers-heavy` fine-tunes `microsoft/deberta-v3-large` for up to 30 epochs across 40 HPO trials. That needs ~12–18 GB VRAM (full fine-tune, fp32) and many hours of wall time on a single GPU. A user with an 8 GB card finds out by OOM, often several minutes into a run. +- Swapping `intfloat/multilingual-e5-large-instruct` (2 GB) for `sentence-transformers/all-MiniLM-L6-v2` (90 MB) changes the resource bill by an order of magnitude — but nothing surfaces this difference up front. +- Disk is a silent failure mode: a search space referencing several large checkpoints can pull >10 GB into the HF cache before any training starts. + +The target audience for this feature is users with limited resources who pick a preset, hit `fit()`, and want to know within a second whether they should change something. + +## Proposed solution: pre-flight resource advisor + +Add a **pre-flight advisor** that, given a parsed search space and a dataset, estimates worst-case disk, RAM, VRAM, and wall-time requirements from public Hugging Face Hub metadata and a small set of formulas, then prints a clear summary with red/yellow/green warnings. By default it is **report-only and never blocks the run**; an opt-in **reduce-to-fit** mode additionally prunes the search space to fit detected hardware. + +### Scope + +The advisor analyses only the **local, model-bearing** modules whose footprint can be derived from HF Hub metadata. Everything else is either trivial or out of band. + + +| Module category | In scope? | Reason | +| -------------------------------------------------------------------------------- | --------- | -------------------------------------------------- | +| `SentenceTransformerEmbeddingConfig` | yes | local transformer, dominant cost on small machines | +| `VllmEmbeddingConfig` | yes | local transformer with extra engine overhead | +| `HFModelConfig`-based scorers (`bert`, `lora`, `ptuning`, `dnnc`, cross-encoder) | yes | the actual heavyweights | +| GCN scorer when configured with a transformer backbone | yes | inherits the backbone cost | +| `LinearScorer` (sklearn `LogisticRegression` / `LogisticRegressionCV`) | yes | dominant cost on presets with no transformer fine-tune; the CV path multiplies a single fit by ~30 | +| `CatBoostScorer` | yes | dominant cost on presets with no transformer fine-tune; high default `iterations` | +| `OpenaiEmbeddingConfig` | no | no local resources to estimate | +| `HashingVectorizerEmbeddingConfig` | no | trivial cost | +| `knn`, `mlknn`, generic `sklearn` classifiers via `SklearnScorer`, `description` | no | bounded so far below any in-scope module that they cannot plausibly be the bottleneck | +| `decision` and `regex` nodes | no | negligible | + + +Rationale: the user's real risk is whichever module is the actual bottleneck. On heavy presets that is a transformer fine-tune; on light presets it shifts to `linear` (CV-multiplied) or `catboost` (1000 default iterations × dataset shape). Modules left out of scope are ones whose cost is bounded so far below any in-scope module that they cannot plausibly be the reason a run fails. + +### Phases + +The advisor is one entry point, but internally splits work into three phases that share a single `PreflightReport` object. The split is internal organization — all three run at the same hook point (after `validate_modules`, before `_fit(context)`) and the user sees one summary. Separating them keeps each phase's inputs, formulas, and failure modes scoped: + +- **Resource phase.** Disk / RAM / VRAM / wall-time estimates and comparisons against detected hardware. Most of the formulas in this document live here. This is the only phase consumed by the reduce-to-fit pruner. +- **Data quality phase.** Findings derived from the dataset jointly with the active search space — token-length truncation, split readiness (auto-invokes the existing `check_split_readiness` utility rather than re-implementing it), partial intent descriptions paired with the `description` scorer, embedder/scorer dimension consistency. Reports red/yellow lines but never prunes the search space; the user fixes the dataset or the config. +- **Configuration sanity phase.** Joint checks across dataset + search-space + hardware that don't slot cleanly into the other two — e.g., `hpo_config.n_jobs > 1` × per-trial VRAM contention, CatBoost `task_type="GPU"` with no CUDA. Pydantic schema validation already runs upstream on `OptimizationConfig`; this phase only adds checks that need joint inspection. + +The advisor consumes `validate_modules`'s *post-filter* view of `self.nodes` — it does not duplicate that mutating filter. + +### Inputs + +- The parsed `OptimizationConfig` (search space, HPO config, embedder/transformer configs). +- The training `Dataset` (for `dataset_size` and an approximate token-length distribution). +- Detected local hardware: + - Total / available RAM via `psutil`. + - Free disk on the AutoIntent / HF cache directory via `shutil.disk_usage`. + - Accelerator detection, in priority order: + - **CUDA:** per-GPU VRAM and device name via `torch.cuda`. + - **MPS (Apple Silicon):** detected via `torch.backends.mps.is_available()`. Apple chips use unified memory, so there is no separate VRAM pool — the "VRAM budget" is a fraction of total system RAM. Default budget = 70 % of total RAM (matching the macOS `PYTORCH_MPS_HIGH_WATERMARK_RATIO` default) with the remainder reserved for the OS and other apps. The fraction is exposed as a knob. + - **CPU only:** when neither is available. + +### Output + +A structured estimate plus a human-readable summary printed to the logger. Example: + +``` +Compute feasibility check +───────────────────────── +Resource: + Available : 8 GB VRAM (NVIDIA RTX 3060), 32 GB RAM, 120 GB free disk + Disk : 5.2 GB to download, 1.1 GB already cached (3 unique checkpoints) + RAM : ~4 GB + VRAM : ~14 GB × 2 parallel trials (n_jobs=2) ⚠ exceeds available + Time : ~6 h (+~12 min for refit_after) (single-GPU, fp32, rough) + +Data: + Train tokens p95 : 612 (exceeds bert.max_length=512) ⚠ ~7% truncated + Split readiness : 2 classes have <3 samples — LogisticRegressionCV cv=3 will fail ✗ + +Config: + CatBoost task_type=GPU but no CUDA detected — will fall back to CPU ⚠ + +Drivers of cost: + scoring.bert microsoft/deberta-v3-large full fine-tune × 40 trials × 30 epochs → ~14 GB VRAM, ~5 h + embedder intfloat/multilingual-e5-large-instruct → ~2.2 GB VRAM + +Suggestions: + • Enable mixed precision (fp16/bf16) on the bert scorer + • Reduce batch_size from 64 to 16 or 32 + • Set hpo_config.n_jobs=1 — parallel trials are doubling VRAM demand + • Try preset `transformers-light` or `classic-medium` + +These numbers are heuristic upper bounds, not measurements. +``` + +Numbers are reported with honest precision (one significant figure for time, two for memory) and an explicit "estimate, not measurement" disclaimer. + +### Algorithm (proposal, allowed to adjust) + +1. **Collect candidates.** Walk the search space; collect every unique in-scope module. For transformer-bearing modules the identity is `(module_type, model_name, mode)` with `mode ∈ {inference, lora, full-finetune}`. For `linear` and `catboost` the identity is `(module_type, embedder_name, task_kind)` with `task_kind ∈ {multiclass, multilabel}` — the routing through `LogisticRegressionCV` vs `MultiOutputClassifier`, and CatBoost's per-class trees, both depend on it. Also collect the HPO knobs that drive cost: `n_trials` plus per-module knobs — transformer (`epochs`, `batch_size`, `max_length`, `dtype` ∈ {fp16, bf16, fp32}), `linear` (`cv`, `max_iter`), `catboost` (`iterations`, `depth`, `task_type`, `features_type`). +2. **Resolve checkpoints.** For each unique `model_name`, query HF Hub for safetensors metadata to read parameter count and weight dtype. Fall back to file-size aggregation if safetensors metadata is missing. Fall back to a "unknown — heuristic only" tag with low-confidence labelling if HF Hub is offline or the repo is private. `LinearScorer` and `CatBoostScorer` have no checkpoint of their own; they reuse the embedder resolved by this step in their formulas (their cost is parameterised by `embedder_dim`, not parameter count). +3. **Apply formulas.** All values are honest upper bounds; convergence and early stopping often terminate well below them. + - **Disk** = sum over unique downloadable checkpoints of total file size, plus a small fixed overhead per checkpoint for tokenizers and config. `LinearScorer` and `CatBoostScorer` contribute zero (they consume embedder output that is already accounted for upstream). + - **RAM per module:** + - Transformer modules (any mode): `params × dtype_bytes + dataset_tokens × 4 bytes`, treated as a loose upper bound for tokenized buffers. + - `LinearScorer`: `8 × n_samples × embedder_dim` (float64 data matrix — the dominant term) `+ 8 × n_classes × embedder_dim` (coefficients) `+ ~10 × 8 × embedder_dim` (L-BFGS history). + - `CatBoostScorer`: `4 × n_samples × n_features` (data, float32 internally) `+ 4 × n_features × n_bins` (histograms; default `n_bins = 254`) `+ iterations × 2^depth × ~32 bytes` (tree storage). For `features_type ∈ {embedding, both}`, `n_features = embedder_dim`. For `features_type = text`, `n_features` is the BoW vocab discovered at fit; bound with a coarse default (e.g. 50 000) and tag the estimate low-confidence. + - For `linear` and `catboost`, `embedder_dim` is taken from the largest embedder in the same node group — same worst-case stance as the rest of the estimate. + - **VRAM per module:** + - Inference embedder: `params × dtype_bytes × ~1.3` (small constant for activations). + - Full fine-tune (`bert`, GCN backbone, soft-prompt `ptuning`): `params × dtype_bytes × (1 + 1 + 2)` for weights + grads + Adam state, halved when fp16/bf16 mixed precision is configured. + - LoRA: inference VRAM + a small adapter constant. + - Reranker (cross-encoder, `dnnc`): inference VRAM × small factor for the reranking pass. + - `LinearScorer`: N/A (sklearn is CPU-only). + - `CatBoostScorer`: 0 by default; if `task_type="GPU"` is configured, the RAM formula above lives on device instead. + - **Time per module:** + - Transformer modules: `n_trials × epochs × (dataset_size / batch_size) × per_step_seconds(params, max_length, device_class)`, where `per_step_seconds` is a small static lookup keyed on coarse device class (`cpu`, `low-gpu`, `mid-gpu`, `high-gpu`, `apple-silicon`) auto-detected from `torch.cuda.get_device_name` or `platform`/`torch.backends.mps`. + - `LinearScorer`: `n_trials × C_cpu × n_samples × embedder_dim × max_iter × cv_multiplier × class_multiplier`, where: + - `C_cpu ≈ 1e-8 s` per `(sample × feature × iteration)` on a single modern CPU core. + - `cv_multiplier = Cs × cv + 1 ≈ 31` for the multiclass path (`LogisticRegressionCV` with default `Cs = 10`, repo default `cv = 3`, plus one final refit). `cv_multiplier = 1` for the multilabel path (no inner CV). + - `class_multiplier = n_classes` for the multilabel path (`MultiOutputClassifier` fits one binary LogReg per class); `class_multiplier = 1` otherwise. + - `CatBoostScorer`: `n_trials × iterations × C_device × n_samples × n_features × depth × class_multiplier`, where: + - `C_device ≈ 1e-9 s` on CPU, ~5–20× faster on GPU. Resolve `C_device` via the same `device_class` lookup as the transformer time formula. + - `class_multiplier = n_classes` for both the multiclass `MultiClass` loss (per-class trees per iteration) and the multilabel routing (one CatBoost per class). + - Early stopping is not modelled; `iterations` is treated as the upper bound. + - Total time = sum across modules. MPS time numbers are coarser than CUDA's (one tier for now); we accept that. +4. **Compare to detected hardware.** Per-dimension status is green / yellow / red against a configurable headroom (defaults: **red** if estimate > 100 % of available, **yellow** if > 70 %). On MPS, "VRAM" and "RAM" estimates draw from the same physical pool; we compare *the larger of the two* against the unified-memory budget rather than each independently. +5. **Render summary.** Log at INFO. If any dimension is red, emit at WARNING so it shows in non-logging contexts. + +#### Resource-phase refinements + +These adjust the formulas above for situations that look fine in single-trial isolation but blow up in practice: + +- **Cold-vs-warm HF cache (Tier 1).** Before reporting disk, probe each unique `model_name` against the local HF cache via `huggingface_hub.try_to_load_from_cache` / `scan_cache_dir`, keyed off `HF_HOME`. Split the disk line into `to_download` vs `already_cached`. Treat a repo as cached only if the weight shard (`model.safetensors` or equivalent) is present — not just config/tokenizer files. Without this, a repeated run on the same machine alarms the user about gigabytes they already have. +- **Concurrent-trial × per-trial VRAM (Tier 1).** Multiply the per-trial VRAM estimate by `hpo_config.n_jobs` when `n_jobs > 1` and the active accelerator is GPU. Same for the `dump_modules=True` path on disk: each trial writes module weights to the dump dir, so multiply per-module dump-disk by `n_trials`. vLLM is process-isolated and its contention model differs; note this in the disclaimer. +- **`refit_after=True` time delta (Tier 2).** When `Pipeline.fit(refit_after=True)`, add one full-data training pass per node to the time estimate. Small term but easy to forget; users running close to their time budget care about it. +- **HF Hub reachability probe (Tier 2).** One up-front `HfApi().whoami()` (or unauthenticated `HEAD` to `huggingface.co`) at the start of the phase. On failure, consistently downgrade *all* model entries to the "unknown — heuristic only" path instead of timing out per-model 10× on a 10-model search space. +- **CatBoost `task_type="GPU"` sanity (Tier 2).** When CatBoost is in the search space with `task_type="GPU"` but `torch.cuda.is_available()` is false, tag yellow — CatBoost silently falls back to CPU and the user otherwise sees CPU speeds with no warning. + +### Data quality phase + +The resource phase predicts whether the run *fits*. The data quality phase predicts whether the run *produces a meaningful result*. Both are caught at the same hook point because both have the same failure mode from the user's perspective: hours of compute followed by a cryptic error or a silently degraded model. + +- **Token-length truncation (Tier 1).** Sample ~1000 utterances from the train split, tokenize against each unique transformer's tokenizer, compute `p95_tokens` and `% truncated` against the module's `max_length`. Yellow when >1% truncated; red when >10%. Reuse the tokenizer the resource phase already loaded for parameter-count resolution — don't double-fetch. The existing pipeline silently truncates (sentence-transformers and the HF Trainer both default to `truncation=True`); there is no warning anywhere today. +- **Auto-invoke `check_split_readiness` (Tier 1).** Call the existing utility at `context/data_handler/_readiness_util.py:44–109` with the active `data_config` and surface its `SplitReadinessResult` — it already returns `underpopulated_classes`, `ready`, and a `reason` string, but is not called anywhere from `Pipeline.fit()` today. When `LinearScorer` with CV is in the search space and any class has `n < cv`, name the module by name in the red line ("`LogisticRegressionCV` cv=3 will fail: classes [X, Y] have <3 samples") rather than emitting a generic split-readiness message. +- **Partial intent descriptions × `description` scorer (Tier 1).** The dataset constructor already warns once at import when *some* but not all intents have descriptions (`_dataset/_dataset.py:199–207`). The advisor escalates this to red when the `description` scorer is also present in the active search space — otherwise the run will produce NaN embeddings for the missing intents. Action message: "fill in N missing descriptions", not "drop the scorer". +- **Embedder ↔ scorer dimension consistency (Tier 2).** For `LinearScorer` / `CatBoostScorer` with `features_type="both"`, verify the embedder reachable from the same node group exposes a stable, expected dimension. Cross-node walk; surface as yellow when the resolved dimension cannot be confirmed pre-flight. + +### Configuration sanity phase + +Pydantic schema validation on `OptimizationConfig` runs upstream at config-load time; this phase only adds checks that require *joint* inspection of dataset + search-space + hardware. With Tier 1 + Tier 2 in scope today, this phase holds two items: + +- The `n_jobs × VRAM` callout, surfaced jointly with the resource phase (single line in the rendered output). +- The CatBoost `task_type="GPU"` without CUDA check, same. + +Both could live entirely in the resource phase; they get their own phase because future additions — joint scorer↔decision shape checks, OOS-support mismatches detected up front rather than at module instantiation, embedder-dimension mismatches — slot here naturally. Keep the phase scaffold even if it is currently thin. + +### Failure modes + +- **HF Hub offline or private repo:** fall back to "unknown model — name-pattern heuristic only", explicit low-confidence label, never raise. +- **No accelerator (no CUDA and no MPS):** report VRAM as N/A and mark GPU-only modules as "requires GPU" without estimating a (misleading) CPU wall time. +- **MPS configured but a module is incompatible:** vLLM in particular does not run on MPS. Flag the module as "unsupported on MPS" rather than estimating; do not raise. +- **MPS with CPU fallback ops:** some PyTorch ops fall back to CPU on MPS, inflating system-RAM usage and wall time beyond the heuristic. Note this in the disclaimer; we don't try to model it. +- **vLLM configured but not installed:** still estimate (the VRAM accounting is similar), note that the engine itself has additional overhead not captured. +- **Estimate wildly wrong vs. reality:** always-on disclaimer in the printed summary that these are heuristic upper bounds. + +### Reduce-to-fit mode + +The feasibility check has two modes sharing the same estimation pipeline: + +- **Report mode (default).** Print the summary, return the structured estimate, let the run proceed regardless of severity. +- **Reduce-to-fit mode (opt-in).** Additionally prune the search space to fit detected hardware before the run starts. Same estimates, same comparisons — just one extra step that produces a reduced search space. + +Reduce-to-fit consumes only the **resource phase** output. Data-quality and config-sanity findings are reported but never trigger pruning — they require user action (fix the dataset, change a config flag), not search-space narrowing. + +Using the same per-module estimates, the pruner applies three least-destructive steps in order: + +1. **Filter discrete-choice hyperparameters.** For lists of cost-driving values (model name, batch size, training epochs, CatBoost `iterations` / `depth`, sklearn `cv`), keep only entries whose worst-case estimate fits. +2. **Cap continuous ranges.** For `{low, high}` ranges of cost-driving parameters, lower the upper bound to the largest fitting value. Ranges of non-cost parameters (learning rate, decision thresholds) are not touched. +3. **Drop module variants.** If a module entry has any required hyperparameter with no satisfiable value left, drop that module entry from its node's search space. + +Guard rails: + +- If pruning would leave any node's search space empty, the pruner **raises**. We don't silently produce a non-runnable pipeline, and we don't quietly fall back to report-only — failing loudly is the right contract for a mode whose whole purpose is to make the run feasible. The error message points the user toward a lighter preset. +- Time is not used as a filter — only memory and disk are. Time is still reported. +- Headroom thresholds are intentionally generous to avoid over-pruning and are configurable. + +Alongside the standard estimate, the caller receives a structured description of what was filtered, capped, and dropped, plus the resulting search space and its recomputed (now green) estimate. + +**Drawbacks worth surfacing.** + +- **Silent narrowing of intent.** A search space deliberately written to include heavy/light variants for comparison gets halved. The mode is opt-in for this reason. +- **Over-pruning when our formulas overestimate.** A 30 %-high estimate on a borderline configuration throws away a run that would have succeeded. Generous headroom defaults mitigate; the knob is exposed. +- **Hard failure when nothing fits.** Raising is intentional — silent degradation to report-only would defeat the mode's purpose — but it is a sharper edge than report mode has. +- **Pre-trial only.** The rewrite happens before any HPO trial starts. This is fine because the search space is treated as immutable across a study, but worth calling out so nobody tries to make this dynamic later. + +### CLI surface + +The advisor is also exposed as a console script (`autointent-advisor`) so users can answer "what will this cost?" and "what should I run?" without writing Python. Two subcommands: + +- **`autointent-advisor inspect `.** Resolves the preset (or a user-supplied `OptimizationConfig`), detects local hardware, runs the same three-phase advisor that `Pipeline.fit()` runs, and prints the same report. Accepts `--dataset` for a real dataset, or `--n-samples / --n-classes / --avg-tokens` placeholders when the dataset is not yet built — so the script is useful before any training data exists. `--json` emits the structured `PreflightReport` for scripting. +- **`autointent-advisor recommend [--n-samples ... | --dataset ...] [--budget-time 12h] [--budget-vram-gb 8]`.** Detects local hardware (with manual overrides applied), iterates over the bundled presets in `_presets/`, and tags each as `feasible` / `feasible-with-reduce` / `infeasible`. Ranks feasible presets by quality tier (`heavy > medium > light`) then estimated wall-time; picks the top one as the recommendation. For the heaviest infeasible preset, surfaces the single most-impactful knob change that would make it fit (e.g., "`transformers-heavy` would fit if `batch_size` ≤ 16 and `dtype=fp16`"), reusing the reduce-to-fit pruner's per-knob delta info. + +**Constraints (both subcommands).** No model downloads — only HF Hub metadata endpoints (`HfApi().model_info`); never `from_pretrained`. Offline-safe — on Hub unreachability, fall back to the same "heuristic only" path and mark the report low-confidence; do not raise. Hardware-detection failures (broken CUDA install where `torch.cuda.mem_get_info()` raises) fall back to CPU detection and tag the report rather than crashing. + +## Alternatives considered and rejected + +### B. Smoke-test calibration + +Run each unique module for one mini-batch / one step before the real fit, measure peak RAM and VRAM with `psutil`, `tracemalloc`, and `torch.cuda.max_memory_allocated`, time the step, and extrapolate to the full search space. + +Rejected because: + +- It **downloads weights just to estimate** — the disk-headroom check we wanted to provide is defeated by the act of performing it. +- It can **OOM while predicting OOM**, exactly on the constrained hardware that is the target audience. +- It adds **seconds to minutes** of wall time before `fit()` does anything, surprising users. +- It needs per-module "tiny run" hooks; not every scorer has a clean "stop after one step" path. +- For OpenAI- or vLLM-served embedders, a smoke test costs real money or starts the engine. +- Still not accurate due to CUDA and CPU cache, memory heating and so on. + +### C. Curated benchmark table + +Ship a JSON in the package with measured VRAM and per-step time for the bundled-preset checkpoints, broken out by hardware class (cpu / mid-gpu / high-gpu) and mode (inference / lora / full-finetune). Fall back to heuristics for unknown checkpoints. + +Rejected because: + +- **Maintenance burden:** every new model added to a preset would need entries across the hardware × precision × mode matrix. +- Numbers **go stale** when `transformers` updates change defaults (attention impl, dtype, gradient checkpointing). +- It still needs the chosen-solution heuristics as a long-tail fallback — so it adds work on top of Option A without replacing it. +- **Confident-but-wrong is worse than honest-but-fuzzy.** A table that says "4 GB on 4090" when the user OOMs at 4.5 GB damages trust more than a clearly-labelled range would. + +### D. Layered (A by default, opt-in B, embedded table from C, local actuals cache) + +Combine all three: ship A as the fast path, allow `calibrate=True` to trigger B for heavy modules only, embed a small table from C for the bundled-preset checkpoints, and write actuals from every real run to a local cache that feeds back into future estimates. + +Rejected because: + +- **Implementation surface multiplies:** two estimation code paths to keep consistent, a cache schema with versioning and eviction, two failure modes to document. +- **Discoverability:** users may not learn about `calibrate=True` and the realized value compresses back to roughly Option A anyway. +- The team's bandwidth doesn't justify the marginal accuracy gain over A for the target audience. + +## Comparison + + +| Dimension | A (chosen) | B (smoke-test) | C (benchmark table) | D (layered) | +| -------------------------------- | ------------------------------ | ---------------------- | ---------------------------------- | ------------------------------------- | +| Wall time at pre-flight | < 1 s | seconds–minutes | < 1 s | < 1 s default, s–min when calibrating | +| Accuracy on common checkpoints | medium | high | high | high | +| Accuracy on custom checkpoints | medium | high | medium (fallback) | medium–high | +| Time-estimate quality | low–medium | high | high | high | +| Disk pre-download required | no | yes | no | only when calibrating | +| Risk of OOM during the check | none | real | none | only when calibrating | +| Network usage | 1 cached call per unique model | none beyond normal fit | none | combination | +| Implementation effort | small | large | medium + ongoing benchmark refresh | large + cache infra | +| Ongoing maintenance | low (formulas only) | low | high | high | +| Friendly to offline / air-gapped | with fallback | yes | yes | partial | + + +The chosen solution accepts a real accuracy gap on time and a moderate accuracy gap on VRAM in exchange for the only profile that fits the target audience's constraints: zero added wall time, zero added downloads, zero added failure modes, and a small one-time implementation cost. + +## Out of scope (possible follow-ups) + +- Live resource observability during `fit()` (peak RAM / VRAM per trial, abort on overrun). +- A learned calibration cache from real runs to refine estimates over time. +- **Determinism / `cudnn.deterministic` check.** Belongs in seed-setting code (`set_seed` utility, `Pipeline.__init__`), not in a feasibility advisor — reproducibility is not a hardware-budget question. +- **OpenAI / Generator token-cost ($) estimation.** Real value, but pricing tables age badly, the `StructuredOutputCache` hit rate is unknowable upfront, and the API-paying audience overlaps poorly with this advisor's stated audience (resource-constrained local users). Push to a separate `cost_estimator` tool. +- **Predictive CO₂ / emissions.** `_callbacks/emissions_tracker.py` already does this retrospectively, accurately. A predictive version multiplies our (loose) time estimate by a regional kWh/CO₂ factor — two sources of imprecision compounded. The retrospective number is the trustworthy one. +- **vLLM startup compile time.** Minutes of overhead before any work, but vLLM is unsupported on MPS, isn't the dominant cost on CUDA once running, and modelling it needs a startup-time lookup table. Note once in the disclaimer; do not model. + diff --git a/pyproject.toml b/pyproject.toml index d39e3ffa9..7677ac2aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "aiometer (>=1.0.0,<2.0.0)", "aiofiles (>=24.1.0,<25.0.0)", "threadpoolctl (>=3.0.0,<4.0.0)", + "psutil (>=5.9.0,<8.0.0)", ] [project.optional-dependencies] @@ -119,6 +120,7 @@ typing = [ "joblib-stubs (>=1.4.2.5.20240918,<2.0.0)", "pandas-stubs (>= 2.2.3.250527, <3.0.0)", "types-aiofiles (>=24.1.0.20250606)", + "types-psutil>=7.2.2.20260518", ] docs = [ "sphinx (>=8.1.3,<9.0.0)", @@ -143,6 +145,7 @@ Documentation = "https://deeppavlov.github.io/AutoIntent/" [project.scripts] "basic-aug" = "autointent.generation.utterances.basic.cli:main" "evolution-aug" = "autointent.generation.utterances.evolution.cli:main" +"autointent-advisor" = "autointent._advisor._cli:main" [build-system] requires = ["uv_build>=0.8.7,<0.9.0"] diff --git a/src/autointent/_advisor/__init__.py b/src/autointent/_advisor/__init__.py new file mode 100644 index 000000000..d8eb6f5ba --- /dev/null +++ b/src/autointent/_advisor/__init__.py @@ -0,0 +1,29 @@ +"""Pre-flight compute feasibility advisor. + +Exposes a small surface used by both ``Pipeline.fit()`` (future integration) and +the ``autointent-advisor`` CLI script. See ``compute-feasibility-advisor-proposal.md`` +at the repo root for the design document. +""" + +from __future__ import annotations + +from ._estimates import run_preflight +from ._hardware import HardwareProfile, detect_hardware +from ._report import DatasetStats, Finding, PreflightReport, RecommendationResult, ResourceEstimate, Severity +from ._workflows import inspect, load_config, recommend, stats_from_dataset + +__all__ = [ + "DatasetStats", + "Finding", + "HardwareProfile", + "PreflightReport", + "RecommendationResult", + "ResourceEstimate", + "Severity", + "detect_hardware", + "inspect", + "load_config", + "recommend", + "run_preflight", + "stats_from_dataset", +] diff --git a/src/autointent/_advisor/_cli.py b/src/autointent/_advisor/_cli.py new file mode 100644 index 000000000..315d5c443 --- /dev/null +++ b/src/autointent/_advisor/_cli.py @@ -0,0 +1,135 @@ +"""Console-script entry point for the pre-flight advisor. + +Two subcommands: + +* ``inspect`` — show what a given preset / config will cost on this machine. +* ``recommend`` — pick the best-fitting bundled preset for this machine. + +Both subcommands accept either a real ``--dataset`` (Hub id or local +csv/json/jsonl/parquet path loaded via ``datasets.load_dataset``) or +``--n-samples / --n-classes / --avg-tokens`` placeholders so the script is +useful before the user has built a dataset. + +The CLI is a thin wrapper around :func:`autointent._advisor.inspect` and +:func:`autointent._advisor.recommend`; callers that don't need argparse can +import those helpers directly. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import sys + +from ._render import render_json, render_recommendation, render_text +from ._report import DatasetStats +from ._workflows import BUNDLED_PRESETS, inspect, recommend, stats_from_dataset + +__all__ = ["BUNDLED_PRESETS", "build_parser", "cmd_inspect", "cmd_recommend", "main"] + +logger = logging.getLogger("autointent.advisor") + + +def _stats_from_args(args: argparse.Namespace) -> DatasetStats: + multilabel = args.task == "multilabel" + if args.dataset: + return stats_from_dataset(args.dataset, multilabel=multilabel) + return DatasetStats.placeholder( + n_samples=args.n_samples, + n_classes=args.n_classes, + avg_tokens=args.avg_tokens, + multilabel=multilabel, + ) + + +def _add_common_dataset_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--dataset", help="Path or hub id of a dataset; overrides placeholders.") + p.add_argument("--n-samples", type=int, default=1_000, help="Placeholder training set size.") + p.add_argument("--n-classes", type=int, default=10, help="Placeholder class count.") + p.add_argument("--avg-tokens", type=int, default=32, help="Placeholder average token length.") + p.add_argument( + "--task", + choices=("multiclass", "multilabel"), + default="multiclass", + help="Placeholder task type when --dataset isn't given.", + ) + + +def cmd_inspect(args: argparse.Namespace) -> int: + report = inspect( + args.target, + stats=_stats_from_args(args), + budget_vram_gb=args.budget_vram_gb, + ) + if args.json: + sys.stdout.write(render_json(report)) + else: + sys.stdout.write(render_text(report)) + sys.stdout.write("\n") + return 0 if report.is_feasible else 1 + + +def cmd_recommend(args: argparse.Namespace) -> int: + result = recommend( + stats=_stats_from_args(args), + budget_vram_gb=args.budget_vram_gb, + budget_time_h=args.budget_time_h, + ) + if args.json: + sys.stdout.write(json.dumps(result.to_dict(), indent=2, default=str)) + sys.stdout.write("\n") + else: + sys.stdout.write(render_recommendation(result.results, result.chosen)) + sys.stdout.write("\n") + if result.chosen: + sys.stdout.write("\n") + sys.stdout.write(render_text(dict(result.results)[result.chosen])) + sys.stdout.write("\n") + return 0 if result.chosen else 1 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="autointent-advisor", + description="Pre-flight feasibility advisor for AutoIntent search-space optimization.", + ) + parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging.") + + sub = parser.add_subparsers(dest="cmd", required=True) + + p_inspect = sub.add_parser( + "inspect", + help="Inspect a preset or OptimizationConfig and print a feasibility report.", + ) + p_inspect.add_argument("target", help="Preset name (e.g. transformers-light) or path to a YAML config.") + p_inspect.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_inspect.add_argument("--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget.") + _add_common_dataset_args(p_inspect) + p_inspect.set_defaults(func=cmd_inspect) + + p_rec = sub.add_parser( + "recommend", + help="Detect hardware and recommend the best-fitting bundled preset.", + ) + p_rec.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_rec.add_argument("--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget.") + p_rec.add_argument("--budget-time-h", type=float, default=None, help="Optional wall-time ceiling in hours.") + _add_common_dataset_args(p_rec) + p_rec.set_defaults(func=cmd_recommend) + + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.WARNING, + format="%(levelname)s %(name)s: %(message)s", + ) + return int(args.func(args)) + + +if __name__ == "__main__": + main() diff --git a/src/autointent/_advisor/_hardware.py b/src/autointent/_advisor/_hardware.py new file mode 100644 index 000000000..6aa9741ee --- /dev/null +++ b/src/autointent/_advisor/_hardware.py @@ -0,0 +1,142 @@ +"""Local hardware detection. + +Probes CPU / RAM / disk and the highest-priority accelerator available +(CUDA -> MPS -> CPU). All probes are wrapped to fall back safely on a +broken install (e.g. CUDA driver mismatch) rather than crash the advisor. +""" + +from __future__ import annotations + +import logging +import os +import platform +import shutil +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal + +import psutil +import torch + +logger = logging.getLogger(__name__) + +Accelerator = Literal["cuda", "mps", "cpu"] + +# matches macOS PYTORCH_MPS_HIGH_WATERMARK_RATIO default +MPS_DEFAULT_BUDGET_RATIO = 0.7 + +_HIGH_GPU_VRAM_GB = 24 +_MID_GPU_VRAM_GB = 12 +_BYTES_PER_GB = 1024**3 # binary GiB convention; matches all advisor byte->GB conversions + + +@dataclass +class HardwareProfile: + accelerator: Accelerator + device_name: str + vram_gb: float + ram_gb: float + free_disk_gb: float + cpu_count: int + notes: list[str] = field(default_factory=list) + + @property + def device_class(self) -> str: + if self.accelerator == "cpu": + return "cpu" + if self.accelerator == "mps": + return "apple-silicon" + if self.vram_gb >= _HIGH_GPU_VRAM_GB: + return "high-gpu" + if self.vram_gb >= _MID_GPU_VRAM_GB: + return "mid-gpu" + return "low-gpu" + + +def _detect_ram_gb() -> float: + return float(psutil.virtual_memory().total) / _BYTES_PER_GB + + +def _detect_free_disk_gb(path: str | None = None) -> float: + cache = Path(path or os.environ.get("HF_HOME") or Path("~/.cache/huggingface").expanduser()) + probe_path = cache if cache.exists() else Path("~").expanduser() + try: + usage = shutil.disk_usage(probe_path) + return usage.free / _BYTES_PER_GB + except OSError as e: + logger.debug("disk usage probe failed at %s: %s", probe_path, e) + return 0.0 + + +def _detect_cuda() -> tuple[float, str] | None: + if not torch.cuda.is_available(): + return None + idx = 0 + try: + _free, total = torch.cuda.mem_get_info(idx) + vram_gb = total / _BYTES_PER_GB + except (RuntimeError, AttributeError) as e: + logger.debug("torch.cuda.mem_get_info failed: %s", e) + return None + name = torch.cuda.get_device_name(idx) + return vram_gb, name + + +def _detect_mps(ram_gb: float, budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO) -> tuple[float, str] | None: + if not (hasattr(torch.backends, "mps") and torch.backends.mps.is_available()): + return None + # apple silicon: unified memory; budget is fraction of total RAM + return ram_gb * budget_ratio, f"Apple Silicon ({platform.machine()})" + + +def detect_hardware( + *, + vram_budget_gb: float | None = None, + mps_budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO, +) -> HardwareProfile: + """Detect the local hardware, with optional manual overrides. + + Args: + vram_budget_gb: when set, overrides the detected VRAM (use for + shared-GPU machines where part of the device is taken). + mps_budget_ratio: fraction of total RAM treated as the MPS + "VRAM" budget on Apple Silicon. + + Returns: + HardwareProfile reflecting current machine state. + """ + notes: list[str] = [] + ram_gb = _detect_ram_gb() + free_disk_gb = _detect_free_disk_gb() + cpu_count = os.cpu_count() or 1 + + cuda = _detect_cuda() + if cuda is not None: + vram_gb, device_name = cuda + accel: Accelerator = "cuda" + else: + mps = _detect_mps(ram_gb, mps_budget_ratio) + if mps is not None: + vram_gb, device_name = mps + accel = "mps" + notes.append(f"MPS unified memory: VRAM budget = {mps_budget_ratio:.0%} of RAM.") + else: + vram_gb = 0.0 + device_name = platform.processor() or "cpu" + accel = "cpu" + + if vram_budget_gb is not None: + if vram_gb and vram_budget_gb > vram_gb: + notes.append(f"Manual --budget-vram-gb={vram_budget_gb} exceeds detected {vram_gb:.1f} GB; using override.") + notes.append(f"Using manual VRAM budget: {vram_budget_gb} GB.") + vram_gb = vram_budget_gb + + return HardwareProfile( + accelerator=accel, + device_name=device_name, + vram_gb=vram_gb, + ram_gb=ram_gb, + free_disk_gb=free_disk_gb, + cpu_count=cpu_count, + notes=notes, + ) diff --git a/src/autointent/_advisor/_hub.py b/src/autointent/_advisor/_hub.py new file mode 100644 index 000000000..677e6045c --- /dev/null +++ b/src/autointent/_advisor/_hub.py @@ -0,0 +1,213 @@ +"""HF Hub metadata lookups + warm-cache probe. + +Memoized per-process. Offline-safe: every probe falls back to a +heuristic value rather than raising. The advisor flips the report's +``low_confidence`` flag when a fallback is taken. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path +from typing import Literal + +from huggingface_hub import HfApi, hf_hub_download, scan_cache_dir, try_to_load_from_cache + +Confidence = Literal["hub", "heuristic"] + +logger = logging.getLogger(__name__) + +_DEFAULT_HEURISTIC_PARAMS = 110_000_000 +_DEFAULT_BYTES_PER_PARAM = 4 +_BYTES_PER_GB = 1024**3 # using the binary GiB convention everywhere in the advisor + + +@dataclass +class ModelMeta: + name: str + total_params: int + weight_bytes_per_param: float + total_file_bytes: int + cached_locally: bool + confidence: Confidence + hidden_size: int | None = None + n_layers: int | None = None + + @property + def disk_gb(self) -> float: + return self.total_file_bytes / _BYTES_PER_GB + + @property + def weights_gb(self) -> float: + return (self.total_params * self.weight_bytes_per_param) / _BYTES_PER_GB + + +def _shape_from_config(model_name: str) -> tuple[int | None, int | None]: + """Return ``(hidden_size, num_hidden_layers)`` straight from the model's config.json. + + ``hf_hub_download`` caches the file after the first call, so repeated lookups + in the same process (or across CLI invocations) hit local disk. Returns + ``(None, None)`` on any failure — the advisor stays best-effort. + """ + try: + path = hf_hub_download(model_name, "config.json") + except Exception as e: # noqa: BLE001 + logger.debug("config.json download(%s) failed: %s", model_name, e) + return None, None + try: + cfg = json.loads(Path(path).read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError) as e: + logger.debug("config.json parse(%s) failed: %s", model_name, e) + return None, None + # Cover the common HF naming variants: BERT/Llama/Gemma use hidden_size + + # num_hidden_layers; T5/MT5 use d_model + num_layers; GPT-2/Neo use n_embd + n_layer. + hidden = cfg.get("hidden_size") or cfg.get("d_model") or cfg.get("n_embd") + layers = cfg.get("num_hidden_layers") or cfg.get("num_layers") or cfg.get("n_layer") + return int(hidden) if hidden else None, int(layers) if layers else None + + +def _is_warm_cached(model_name: str) -> bool: + """True when the weight shard is present in the local HF cache.""" + weight_files = ["model.safetensors", "pytorch_model.bin", "model.safetensors.index.json"] + for fname in weight_files: + path = try_to_load_from_cache(model_name, fname) + if isinstance(path, str): + return True + + # sharded models won't match the single-file probe; fall back to a scan + try: + cache = scan_cache_dir() + except Exception as e: # noqa: BLE001 + logger.debug("scan_cache_dir failed: %s", e) + return False + return any(repo.repo_id == model_name for repo in cache.repos) + + +def _hub_metadata(model_name: str) -> ModelMeta | None: + try: + info = HfApi().model_info(model_name, files_metadata=True) + except Exception as e: # noqa: BLE001 + logger.debug("model_info(%s) failed: %s", model_name, e) + return None + # Bytes-per-element for safetensors dtype strings. Used to convert the per-dtype + # parameter counts (info.safetensors.parameters) into a weighted average + # bytes-per-param when a checkpoint stores tensors in multiple dtypes. + _dtype_bytes: dict[str, int] = { + "F64": 8, + "F32": 4, + "F16": 2, + "BF16": 2, + "I64": 8, + "I32": 4, + "I16": 2, + "I8": 1, + "U8": 1, + "BOOL": 1, + } + + total_params = 0 + weight_bytes_per_param: float = _DEFAULT_BYTES_PER_PARAM + if info.safetensors is not None: + params_by_dtype = info.safetensors.parameters or {} + total_params = info.safetensors.total or sum(params_by_dtype.values()) + if total_params: + total_weight_bytes = sum( + _dtype_bytes.get(dtype, _DEFAULT_BYTES_PER_PARAM) * count for dtype, count in params_by_dtype.items() + ) + if total_weight_bytes: + weight_bytes_per_param = total_weight_bytes / total_params + + total_file_bytes = sum(s.size for s in (info.siblings or []) if s.size) + + # Track whether either size came from the Hub or from the name-pattern fallback; + # if any field was filled by heuristic, downgrade confidence so the report flips + # low_confidence rather than misreporting hub-grade accuracy. + confidence: Confidence = "hub" + if total_params == 0: + total_params = _DEFAULT_HEURISTIC_PARAMS + confidence = "heuristic" + + if total_file_bytes == 0: + total_file_bytes = int(total_params * weight_bytes_per_param) + confidence = "heuristic" + + hidden_size, n_layers = _shape_from_config(model_name) + if hidden_size is None or n_layers is None: + logger.warning( + "Could not read hidden_size / num_hidden_layers from config.json for %s; " + "activation-memory estimates will fall back to BERT-base defaults (768 / 12).", + model_name, + ) + + return ModelMeta( + name=model_name, + total_params=total_params, + weight_bytes_per_param=weight_bytes_per_param, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence=confidence, + hidden_size=hidden_size, + n_layers=n_layers, + ) + + +def _heuristic_metadata(model_name: str) -> ModelMeta: + logger.warning( + "Falling back to name-pattern heuristic for %s; " + "activation-memory estimates will use BERT-base defaults (hidden=768, layers=12).", + model_name, + ) + total_file_bytes = _DEFAULT_HEURISTIC_PARAMS * _DEFAULT_BYTES_PER_PARAM + return ModelMeta( + name=model_name, + total_params=_DEFAULT_HEURISTIC_PARAMS, + weight_bytes_per_param=_DEFAULT_BYTES_PER_PARAM, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence="heuristic", + ) + + +def _looks_like_local_path(model_name: str) -> bool: + """True when ``model_name`` is a filesystem path rather than an HF Hub repo id. + + Hub repo ids match ``org/repo``; anything that starts with a path separator, + ``~``, a relative-path prefix, or a Windows drive letter, or contains a + backslash, is treated as a local path. We can't rely on ``Path.is_absolute()`` + alone because POSIX-style absolute paths (``/tmp/...``) are *not* absolute + on Windows. + """ + if model_name.startswith(("local:", "/", "~", "./", "../", "\\\\")): + return True + if "\\" in model_name: + return True + return len(model_name) >= 2 and model_name[1] == ":" and model_name[0].isalpha() # noqa: PLR2004 + + +@lru_cache(maxsize=64) +def resolve_model(model_name: str) -> ModelMeta: + """Resolve metadata for a single model name. Memoized per process. + + Always returns a value — never raises — so the advisor can keep going + on offline machines or for unknown checkpoints. + """ + if _looks_like_local_path(model_name): + return ModelMeta( + name=model_name, + total_params=_DEFAULT_HEURISTIC_PARAMS, + weight_bytes_per_param=_DEFAULT_BYTES_PER_PARAM, + total_file_bytes=0, + cached_locally=True, + confidence="heuristic", + ) + + # _hub_metadata returns None on any failure (network outage, missing repo, + # SDK exception) so we don't need a separate up-front probe. + meta = _hub_metadata(model_name) + if meta is not None: + return meta + + return _heuristic_metadata(model_name) diff --git a/src/autointent/_advisor/_render.py b/src/autointent/_advisor/_render.py new file mode 100644 index 000000000..afd541e2b --- /dev/null +++ b/src/autointent/_advisor/_render.py @@ -0,0 +1,154 @@ +"""Rendering for the pre-flight report. + +Text output is grouped by phase (Resource / Data / Config) plus a Drivers +section and the always-on disclaimer. JSON output dumps the structured +report straight through. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from ._report import PreflightReport + +_SEVERITY_TAG = {"ample": "✓", "tight": "⚠", "over": "x"} +_PHASE_ORDER = ("resource", "data", "config") +_PHASE_LABEL = {"resource": "Resource", "data": "Data", "config": "Config"} + + +def _batch_hint(driver: dict[str, Any]) -> str: + """Per-driver batch annotation: '64 -> 32', '64', '64 (no fit)', or ''.""" + bs = driver.get("batch_size") + if bs is None: + return "" + mx = driver.get("max_batch_size") + if mx is None: + return str(bs) + if mx == 0: + return f"{bs} (no fit)" + if mx == bs: + return str(bs) + return f"{bs} -> {mx}" + + +_DRIVERS_LIMIT = 8 +_DRIVERS_HEADERS = ("Node", "Model", "Mode", "VRAM", "Time", "Batch", "Source") + + +def _render_drivers_table(drivers: list[dict[str, Any]]) -> list[str]: + """Format the Drivers of cost section as an aligned table.""" + visible = drivers[:_DRIVERS_LIMIT] + rows: list[tuple[str, ...]] = [ + ( + f"{d['node_type']}.{d['module']}", + str(d["model"]), + str(d["mode"]), + f"{d['vram_gb']:.2f} GB", + f"{d['time_hours']:.2f} h", + _batch_hint(d), + f"[{d['confidence']}]", + ) + for d in visible + ] + + widths = [len(h) for h in _DRIVERS_HEADERS] + for row in rows: + for i, cell in enumerate(row): + widths[i] = max(widths[i], len(cell)) + + # Right-align numeric columns (VRAM @ idx 3, Time @ idx 4); left-align the rest. + right_align = {3, 4} + + def fmt(row: tuple[str, ...]) -> str: + cells = [] + for i, cell in enumerate(row): + if i in right_align: + cells.append(cell.rjust(widths[i])) + else: + cells.append(cell.ljust(widths[i])) + return " " + " ".join(cells).rstrip() + + lines = ["Drivers of cost:", fmt(_DRIVERS_HEADERS), " " + " ".join("─" * w for w in widths)] + lines.extend(fmt(r) for r in rows) + if len(drivers) > _DRIVERS_LIMIT: + lines.append(f" … and {len(drivers) - _DRIVERS_LIMIT} more") + return lines + + +def render_text(report: PreflightReport) -> str: + lines: list[str] = [] + title = "Compute feasibility check" + if report.preset_name: + title += f" — {report.preset_name}" + lines.append(title) + lines.append("─" * len(title)) + + hw = report.hardware + lines.append( + f"Hardware: {hw.get('accelerator', '?')} ({hw.get('device_name', '?')})," + f" {hw.get('vram_gb', 0):.1f} GB VRAM, {hw.get('ram_gb', 0):.0f} GB RAM," + f" {hw.get('free_disk_gb', 0):.0f} GB free disk" + ) + ds = report.dataset + lines.append( + f"Dataset: n_samples={ds.get('n_samples')}, n_classes={ds.get('n_classes')}," + f" avg_tokens={ds.get('avg_tokens')} ({ds.get('source')})" + ) + lines.append("") + + for phase in _PHASE_ORDER: + bucket = [f for f in report.findings if f.phase == phase] + if not bucket: + continue + lines.append(f"{_PHASE_LABEL[phase]}:") + for f in bucket: + tag = _SEVERITY_TAG.get(f.severity.value, "·") + lines.append(f" {tag} {f.message}") + lines.append("") + + if report.resource.drivers: + lines.extend(_render_drivers_table(report.resource.drivers)) + lines.append("") + + if report.notes: + lines.append("Notes:") + lines.extend(f" • {note}" for note in report.notes) + lines.append("") + + summary = f"Verdict: {'feasible' if report.is_feasible else 'INFEASIBLE'} " + summary += f"(headroom: {report.headroom.value})" + if report.low_confidence: + summary += " — low-confidence (heuristic fallback in use)" + lines.append(summary) + lines.append("Note: estimates are heuristic upper bounds, not measurements.") + return "\n".join(lines) + + +def render_json(report: PreflightReport) -> str: + return json.dumps(report.to_dict(), indent=2, default=str) + + +def render_recommendation( + results: list[tuple[str, PreflightReport]], + chosen: str | None, +) -> str: + """Compact table for the ``recommend`` subcommand.""" + lines = ["", "Recommendation:"] + if chosen: + lines.append(f" -> {chosen}") + else: + lines.append(" -> none of the bundled presets fit your hardware as-is.") + lines.append("") + lines.append(f"{'Preset':<24} {'Status':<14} {'VRAM':<10} {'Time':<10} {'Headroom':<10}") + lines.append("-" * 68) + for name, report in results: + verdict = "feasible" if report.is_feasible else "infeasible" + lines.append( + f"{name:<24} {verdict:<14} " + f"{report.resource.vram_gb:>4.1f} GB " + f"{report.resource.time_hours:>4.1f} h " + f"{report.headroom.value:<8}" + ) + return "\n".join(lines) diff --git a/src/autointent/_advisor/_report.py b/src/autointent/_advisor/_report.py new file mode 100644 index 000000000..c9fd920f4 --- /dev/null +++ b/src/autointent/_advisor/_report.py @@ -0,0 +1,130 @@ +"""Dataclasses for the pre-flight advisor's structured report.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from enum import Enum +from typing import Any, Literal + + +class Severity(str, Enum): + AMPLE = "ample" + TIGHT = "tight" + OVER = "over" + + +Phase = Literal["resource", "data", "config"] + + +@dataclass(frozen=True) +class Finding: + """A single advisor finding rendered as one line in the summary.""" + + phase: Phase + severity: Severity + message: str + metric: str | None = None + + +@dataclass +class ResourceEstimate: + """Aggregated resource numbers across the search space.""" + + disk_download_gb: float = 0.0 + disk_cached_gb: float = 0.0 + disk_dump_gb: float = 0.0 + ram_gb: float = 0.0 + vram_gb: float = 0.0 + time_hours: float = 0.0 + parallel_factor: int = 1 + drivers: list[dict[str, Any]] = field(default_factory=list) + + @property + def total_disk_gb(self) -> float: + return self.disk_download_gb + self.disk_dump_gb + + +@dataclass +class DatasetStats: + """Minimal stats the advisor needs about the user's dataset. + + Built either from a real ``Dataset`` or from CLI placeholder flags. + """ + + n_samples: int + n_classes: int + avg_tokens: int + p95_tokens: int | None = None + multilabel: bool = False + has_descriptions: bool | None = None + rare_classes: list[str] = field(default_factory=list) + source: str = "placeholder" + + @classmethod + def placeholder( + cls, + n_samples: int = 1_000, + n_classes: int = 10, + avg_tokens: int = 32, + multilabel: bool = False, + ) -> DatasetStats: + return cls( + n_samples=n_samples, + n_classes=n_classes, + avg_tokens=avg_tokens, + p95_tokens=int(avg_tokens * 2.5), + multilabel=multilabel, + ) + + +@dataclass +class PreflightReport: + """One report covering all three phases.""" + + findings: list[Finding] = field(default_factory=list) + resource: ResourceEstimate = field(default_factory=ResourceEstimate) + hardware: dict[str, Any] = field(default_factory=dict) + dataset: dict[str, Any] = field(default_factory=dict) + preset_name: str | None = None + low_confidence: bool = False + notes: list[str] = field(default_factory=list) + + def add(self, phase: Phase, severity: Severity, message: str, metric: str | None = None) -> None: + self.findings.append(Finding(phase=phase, severity=severity, message=message, metric=metric)) + + @property + def headroom(self) -> Severity: + """Worst headroom level across all findings — the column shown in CLI reports.""" + order = {Severity.AMPLE: 0, Severity.TIGHT: 1, Severity.OVER: 2} + if not self.findings: + return Severity.AMPLE + return max((f.severity for f in self.findings), key=lambda s: order[s]) + + @property + def is_feasible(self) -> bool: + return self.headroom != Severity.OVER + + def to_dict(self) -> dict[str, Any]: + d = asdict(self) + d["findings"] = [{**asdict(f), "severity": f.severity.value} for f in self.findings] + d["headroom"] = self.headroom.value + d["is_feasible"] = self.is_feasible + return d + + +@dataclass +class RecommendationResult: + """Output of the recommend workflow: ranked per-preset reports plus the pick. + + ``chosen`` is the best feasible preset name, or ``None`` if none fit. + ``results`` is the full per-preset report list in evaluation order. + """ + + chosen: str | None + results: list[tuple[str, PreflightReport]] + + def to_dict(self) -> dict[str, Any]: + return { + "chosen": self.chosen, + "results": [{"preset": name, "report": r.to_dict()} for name, r in self.results], + } diff --git a/src/autointent/_advisor/workflows.py b/src/autointent/_advisor/workflows.py new file mode 100644 index 000000000..2331e225a --- /dev/null +++ b/src/autointent/_advisor/workflows.py @@ -0,0 +1,230 @@ +"""High-level advisor workflows: ``inspect`` and ``recommend``. + +Each workflow orchestrates the lower-level pieces (``load_config``, +``detect_hardware``, ``stats_from_dataset``, ``run_preflight``) into a single +typed call. They expose the same logic the CLI uses but accept Python +arguments instead of an ``argparse.Namespace`` — useful from notebooks, +integration tests, or any caller that wants a ``PreflightReport`` / +``RecommendationResult`` directly. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Any, get_args + +import yaml +from datasets import ClassLabel, Sequence, load_dataset + +from autointent.custom_types import SearchSpacePreset +from autointent.utils import load_preset + +from ._hardware import detect_hardware +from ._report import DatasetStats, RecommendationResult, Severity +from .runner import run_preflight + +if TYPE_CHECKING: + from collections.abc import Iterable + + from ._report import PreflightReport + + +logger = logging.getLogger("autointent.advisor") + +_SAMPLE_LIMIT = 1000 +_P95_PERCENTILE = 0.95 +BUNDLED_PRESETS: tuple[str, ...] = get_args(SearchSpacePreset) + + +def load_config(target: str) -> tuple[dict[str, Any], str]: + """Return ``(config_dict, friendly_name)`` for either a preset name or a YAML path.""" + path = Path(target) + if path.is_file(): + with path.open(encoding="utf-8") as f: + return yaml.safe_load(f), path.stem + return load_preset(target), target # type: ignore[arg-type] + + +def stats_from_dataset(path: str, *, multilabel: bool = False) -> DatasetStats: + """Best-effort: load a dataset via HF ``datasets.load_dataset`` and derive advisor stats. + + Accepts a Hub repo id (``DeepPavlov/clinc150``) or a local file path + (``.csv`` / ``.json`` / ``.jsonl`` / ``.parquet``) / dataset directory. Falls + back to a placeholder on any loader error so callers stay best-effort. + """ + # Anything not in this map (no suffix, unknown suffix) is treated as a Hub + # repo id or a dataset directory and passed to load_dataset directly. + file_builders = {".csv": "csv", ".tsv": "csv", ".json": "json", ".jsonl": "json", ".parquet": "parquet"} + builder = file_builders.get(Path(path).suffix.lower()) + try: + ds = load_dataset(builder, data_files=path) if builder else load_dataset(path) + except (OSError, ValueError, FileNotFoundError) as e: + logger.warning("Failed to load dataset %s: %s", path, e) + return DatasetStats.placeholder(multilabel=multilabel) + + train = ds["train"] if "train" in ds else next(iter(ds.values()), None) + if train is None: + return DatasetStats.placeholder(multilabel=multilabel) + + cols = train.column_names + utt_col = next( + (c for c in ("utterance", "text", "sentence", "query", "input") if c in cols), cols[0] if cols else None + ) + label_col = next((c for c in ("label", "labels", "intent", "target") if c in cols), None) + + detected_multilabel, n_classes = _label_shape(train, label_col, fallback_multilabel=multilabel) + + sample = train[:_SAMPLE_LIMIT] if len(train) > _SAMPLE_LIMIT else train[:] + lengths = [len(str(s).split()) for s in (sample.get(utt_col, []) if utt_col else [])] + avg_tokens = int(sum(lengths) / max(1, len(lengths))) if lengths else 32 + if lengths: + sorted_lengths = sorted(lengths) + idx = max(0, min(len(sorted_lengths) - 1, round((len(sorted_lengths) - 1) * _P95_PERCENTILE))) + p95 = sorted_lengths[idx] + else: + p95 = avg_tokens * 2 + + return DatasetStats( + n_samples=len(train), + n_classes=n_classes, + avg_tokens=avg_tokens, + p95_tokens=p95, + multilabel=detected_multilabel, + has_descriptions=None, + rare_classes=_rare_classes(train, label_col, detected_multilabel, n_classes) if label_col else [], + source=f"dataset:{path}", + ) + + +def _label_shape(train: Any, label_col: str | None, *, fallback_multilabel: bool) -> tuple[bool, int]: # noqa: ANN401 + """Derive ``(multilabel, n_classes)`` from the HF feature schema with a value-based fallback.""" + if label_col is None: + return fallback_multilabel, 0 + feature = train.features.get(label_col) + if isinstance(feature, Sequence): + inner = feature.feature + if isinstance(inner, ClassLabel): + return True, inner.num_classes + # Sequence of plain ints — n_classes = max label index + 1. + max_idx = max((max(row) for row in train[label_col] if row), default=-1) + return True, max_idx + 1 + if isinstance(feature, ClassLabel): + return False, feature.num_classes + # Plain int/string column. Detect multilabel from the first non-empty row, then count uniques. + is_multi = len(train) > 0 and isinstance(train[0][label_col], (list, tuple)) + if is_multi: + max_idx = max((max(row) for row in train[label_col] if row), default=-1) + return True, max_idx + 1 + return False, len({label for label in train[label_col] if label is not None}) + + +def _rare_classes( + train: Any, # noqa: ANN401 + label_col: str, + multilabel: bool, + n_classes: int, + min_count: int = 3, +) -> list[str]: + """Return labels with fewer than ``min_count`` samples in the train split. + + Used to surface the LogisticRegressionCV(cv=3) failure case before fit. + Returns an empty list on any error so the advisor stays best-effort. + """ + try: + labels = train[label_col] + except (KeyError, AttributeError, TypeError): + return [] + counts: dict[str, int] = {} + if multilabel: + for row in labels: + if not row: + continue + for i, v in enumerate(row): + if v: + counts[str(i)] = counts.get(str(i), 0) + 1 + for i in range(n_classes): + counts.setdefault(str(i), 0) + else: + for label in labels: + counts[str(label)] = counts.get(str(label), 0) + 1 + return sorted(name for name, c in counts.items() if c < min_count) + + +def inspect( + target: str, + *, + stats: DatasetStats | None = None, + budget_vram_gb: float | None = None, +) -> PreflightReport: + """Inspect a preset (or YAML config path) against the local hardware. + + Args: + target: Bundled preset name (e.g. ``'transformers-light'``) or a YAML + config path. The friendly name surfaced in the report is the file + stem for paths and the preset name otherwise. + stats: Dataset stats to score against. Defaults to a placeholder if + ``None``. + budget_vram_gb: Optional VRAM-budget override for the hardware probe. + + Returns: + ``PreflightReport`` covering resource / data / config phases. + """ + config, name = load_config(target) + hardware = detect_hardware(vram_budget_gb=budget_vram_gb) + return run_preflight(config, stats or DatasetStats.placeholder(), hardware, preset_name=name) + + +def recommend( + *, + stats: DatasetStats | None = None, + presets: Iterable[str] | None = None, + budget_vram_gb: float | None = None, + budget_time_h: float | None = None, +) -> RecommendationResult: + """Walk bundled presets and return the best feasible fit plus all per-preset reports. + + Args: + stats: Dataset stats to score against. Defaults to a placeholder if ``None``. + presets: Override of the preset list (defaults to ``BUNDLED_PRESETS``). + budget_vram_gb: Optional VRAM-budget override for the hardware probe. + budget_time_h: Optional wall-time ceiling in hours; presets exceeding it + get an extra ``Severity.OVER`` finding so they drop out of the + feasible ranking. + + Returns: + ``RecommendationResult`` with the chosen preset name and full results list. + + Note: + Among feasible presets we pick the heaviest one that still fits the + hardware budget — "use what you have" semantics. This is a *cost* + ranking, not a quality ranking: a heavier preset is not strictly better + and may overfit on small datasets where a classic-* preset would win on + accuracy. Override ``presets=`` if you want a different ranking. + """ + hardware = detect_hardware(vram_budget_gb=budget_vram_gb) + stats = stats or DatasetStats.placeholder() + preset_iter = list(presets) if presets is not None else BUNDLED_PRESETS + + results: list[tuple[str, PreflightReport]] = [] + for preset in preset_iter: + try: + cfg = load_preset(preset) # type: ignore[arg-type] + except (OSError, ValueError, KeyError) as e: + logger.debug("Skipping preset %s: %s", preset, e) + continue + report = run_preflight(cfg, stats, hardware, preset_name=preset) + if budget_time_h is not None and report.resource.time_hours > budget_time_h: + report.add( + "resource", + Severity.OVER, + f"Estimated time {report.resource.time_hours:.1f} h exceeds budget {budget_time_h} h.", + ) + results.append((preset, report)) + + cost_rank = {name: i for i, name in enumerate(BUNDLED_PRESETS)} + feasible = [(name, r) for name, r in results if r.is_feasible] + feasible.sort(key=lambda pair: (cost_rank.get(pair[0], len(BUNDLED_PRESETS)), pair[0])) + chosen = feasible[0][0] if feasible else None + + return RecommendationResult(chosen=chosen, results=results) diff --git a/src/autointent/custom_types/_types.py b/src/autointent/custom_types/_types.py index cbfa82576..59e6b87a3 100644 --- a/src/autointent/custom_types/_types.py +++ b/src/autointent/custom_types/_types.py @@ -117,18 +117,25 @@ class Split: """ SearchSpacePreset = Literal[ - "classic-heavy", - "classic-light", - "classic-medium", - "nn-heavy", - "nn-medium", "transformers-heavy", "transformers-light", - "transformers-no-hpo", + "nn-heavy", "zero-shot-llm", + "nn-medium", + "classic-heavy", + "transformers-no-hpo", + "classic-medium", "zero-shot-encoders", + "classic-light", ] -"""Some presets that our library supports.""" +"""Bundled search-space presets, listed in descending resource-cost order. + +Heavier presets explore more / larger models and take longer to run. The order +is a cost ranking, **not** a quality ranking: a heavier preset is not strictly +better — e.g. ``transformers-heavy`` will overfit on tiny datasets where a +classic-* preset wins on accuracy. ``autointent._advisor.recommend`` uses this +ordering to pick the heaviest preset that still fits the hardware budget, +which is a reasonable default but not always the right choice for the data.""" class Document(BaseModel): diff --git a/tests/advisor/__init__.py b/tests/advisor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/advisor/test_estimates_and_cli.py b/tests/advisor/test_estimates_and_cli.py new file mode 100644 index 000000000..d87f90740 --- /dev/null +++ b/tests/advisor/test_estimates_and_cli.py @@ -0,0 +1,225 @@ +"""End-to-end smoke tests for the advisor. + +These run offline — HF Hub probes are monkeypatched to fail so the +advisor falls back to its name-pattern heuristics. Verifies that: + +* every bundled preset can be inspected without raising; +* the recommend subcommand picks something on a generous budget and + nothing on a hostile one; +* ``--json`` emits parseable JSON. +""" + +from __future__ import annotations + +import json +import sys + +import pytest + +from autointent._advisor import DatasetStats, HardwareProfile, run_preflight +from autointent._advisor._cli import BUNDLED_PRESETS, main +from autointent.utils import load_preset + + +@pytest.fixture(autouse=True) +def _force_offline(monkeypatch: pytest.MonkeyPatch) -> None: + """Force HF Hub lookups to fail so tests don't hit the network.""" + from autointent._advisor import _hub + + _hub.resolve_model.cache_clear() + monkeypatch.setattr(_hub, "_hub_metadata", lambda _name: None) + + +def _profile(vram_gb: float = 16.0) -> HardwareProfile: + return HardwareProfile( + accelerator="cuda" if vram_gb > 0 else "cpu", + device_name="test-gpu" if vram_gb > 0 else "test-cpu", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +@pytest.mark.parametrize("preset", BUNDLED_PRESETS) +def test_every_preset_inspects_without_raising(preset: str) -> None: + cfg = load_preset(preset) # type: ignore[arg-type] + stats = DatasetStats.placeholder(n_samples=500, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0), preset_name=preset) + assert report.preset_name == preset + # always at least one resource-phase finding + assert any(f.phase == "resource" for f in report.findings) + + +def test_heavy_preset_is_infeasible_on_2gb_budget() -> None: + cfg = load_preset("transformers-heavy") + stats = DatasetStats.placeholder(n_samples=5000, n_classes=20, avg_tokens=40) + report = run_preflight(cfg, stats, _profile(vram_gb=2.0), preset_name="transformers-heavy") + assert not report.is_feasible, "deberta-v3-large should not fit in 2 GB" + + +def test_light_preset_is_feasible_on_8gb_budget() -> None: + cfg = load_preset("transformers-light") + stats = DatasetStats.placeholder(n_samples=1000, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=8.0), preset_name="transformers-light") + assert report.is_feasible + + +def test_n_jobs_doubles_vram_findings() -> None: + cfg = load_preset("transformers-light") + cfg = {**cfg, "hpo_config": {**(cfg.get("hpo_config") or {}), "n_jobs": 4}} + stats = DatasetStats.placeholder() + report = run_preflight(cfg, stats, _profile(vram_gb=4.0)) + assert any("parallel trials" in f.message for f in report.findings) + assert any(f.phase == "config" and "n_jobs" in f.message for f in report.findings) + + +def test_cli_inspect_json_is_parseable(capsys: pytest.CaptureFixture[str]) -> None: + rc = main( + [ + "inspect", + "transformers-light", + "--n-samples", + "500", + "--n-classes", + "5", + "--avg-tokens", + "20", + "--json", + "--budget-vram-gb", + "16", + ] + ) + captured = capsys.readouterr() + payload = json.loads(captured.out) + assert payload["preset_name"] == "transformers-light" + assert "findings" in payload + assert payload["headroom"] in {"ample", "tight", "over"} + # rc is 0 on feasible, 1 otherwise + assert rc in (0, 1) + + +def test_cli_inspect_text_runs(capsys: pytest.CaptureFixture[str]) -> None: + main( + [ + "inspect", + "transformers-light", + "--n-samples", + "200", + "--n-classes", + "5", + "--avg-tokens", + "15", + "--budget-vram-gb", + "16", + ] + ) + out = capsys.readouterr().out + assert "Compute feasibility check" in out + assert "Verdict:" in out + + +def test_cli_recommend_picks_a_preset_on_generous_hardware( + capsys: pytest.CaptureFixture[str], +) -> None: + rc = main( + [ + "recommend", + "--n-samples", + "1000", + "--n-classes", + "10", + "--avg-tokens", + "20", + "--budget-vram-gb", + "24", + ] + ) + out = capsys.readouterr().out + assert "Recommendation:" in out + assert rc == 0 + + +def test_partial_descriptions_with_description_scorer_flags_red() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "description_bi"}, + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=24, + has_descriptions=False, + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any(f.phase == "data" and "description" in f.message.lower() for f in report.findings) + + +def test_long_dataset_triggers_truncation_warning() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "max_length": [128], + } + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=80, + p95_tokens=512, # well over 128 + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any("truncation" in f.message.lower() for f in report.findings) + + +def test_cli_recommend_budget_time_flags_red_for_overbudget_presets( + capsys: pytest.CaptureFixture[str], +) -> None: + """Tight time budget must flag every preset that exceeds it with RED severity. + + Previously the budget path used a tautological severity expression and the + breach never escalated the finding — covers the regression.""" + main( + [ + "recommend", + "--n-samples", + "1000", + "--n-classes", + "10", + "--avg-tokens", + "20", + "--budget-vram-gb", + "48", + "--budget-time-h", + "0.0001", + "--json", + ] + ) + payload = json.loads(capsys.readouterr().out) + flagged = [ + r + for r in payload["results"] + if any(f["severity"] == "over" and "exceeds budget" in f["message"] for f in r["report"]["findings"]) + ] + assert flagged, "budget-time-h breach should produce OVER severity findings" + # Any preset above the budget must be marked infeasible. + for r in flagged: + assert r["report"]["is_feasible"] is False + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-v"])) diff --git a/tests/advisor/test_estimates_internals.py b/tests/advisor/test_estimates_internals.py new file mode 100644 index 000000000..3d4e4e526 --- /dev/null +++ b/tests/advisor/test_estimates_internals.py @@ -0,0 +1,608 @@ +"""Targeted tests for `_estimates` helpers + edge cases of `run_preflight`.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from autointent._advisor import _estimates, _hub +from autointent._advisor._estimates import ( + _classify_severity, + _extract_model_names, + _max_int, + _ram_for_module, + _vram_for_transformer, + run_preflight, +) +from autointent._advisor._hardware import HardwareProfile +from autointent._advisor._hub import ModelMeta +from autointent._advisor._report import DatasetStats, Severity + +# Per-name ModelMeta fixtures used by the offline tests. Production resolution +# (HF Hub config.json + safetensors metadata) is mocked away so the batch-fit +# math doesn't depend on whatever fallback the heuristic path returns. +_FAKE_SHAPES: dict[str, tuple[int, int, int]] = { + # (total_params, hidden_size, n_layers) + "microsoft/deberta-v3-large": (350_000_000, 1024, 24), + "microsoft/deberta-v3-small": (140_000_000, 768, 6), + "sentence-transformers/all-MiniLM-L6-v2": (33_000_000, 384, 6), + "intfloat/multilingual-e5-large-instruct": (560_000_000, 1024, 24), +} + + +def _fake_resolve(model_name: str) -> ModelMeta: + known = _FAKE_SHAPES.get(model_name) + params, hidden, layers = known or (110_000_000, 768, 12) + return ModelMeta( + name=model_name, + total_params=params, + weight_bytes_per_param=4, + total_file_bytes=params * 4, + cached_locally=False, + confidence="hub" if known else "heuristic", + hidden_size=hidden, + n_layers=layers, + ) + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.resolve_model.cache_clear() + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + # Inject deterministic ModelMeta per name; both the _hub re-export and the + # _estimates rebinding need to be replaced for run_preflight to pick it up. + monkeypatch.setattr(_hub, "resolve_model", _fake_resolve) + monkeypatch.setattr(_estimates, "resolve_model", _fake_resolve) + + +def _profile(vram_gb: float = 16.0, accelerator: str = "cuda") -> HardwareProfile: + return HardwareProfile( + accelerator=accelerator, # type: ignore[arg-type] + device_name=f"test-{accelerator}", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +class TestMaxInt: + def test_none_returns_default(self) -> None: + assert _max_int(None, 7) == 7 + + def test_list_picks_max(self) -> None: + assert _max_int([1, 5, 3], 0) == 5 + + def test_range_dict_uses_high(self) -> None: + assert _max_int({"low": 1, "high": 9}, 0) == 9 + + def test_scalar_int_passes_through(self) -> None: + assert _max_int(42, 0) == 42 + + def test_garbage_returns_default(self) -> None: + assert _max_int("not-a-number", 11) == 11 + + +class TestExtractModelNames: + def test_classification_model_config_as_list(self) -> None: + entry = {"classification_model_config": [{"model_name": "foo/bar"}]} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_classification_model_config_as_dict(self) -> None: + entry = {"classification_model_config": {"model_name": "foo/bar"}} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_embedder_config_picked_up(self) -> None: + entry = {"embedder_config": [{"model_name": "e/b"}]} + assert _extract_model_names(entry) == ["e/b"] + + def test_multiple_choices_all_returned(self) -> None: + entry = { + "classification_model_config": [ + {"model_name": "a/x"}, + {"model_name": "b/y"}, + ] + } + assert _extract_model_names(entry) == ["a/x", "b/y"] + + def test_empty_entry(self) -> None: + assert _extract_model_names({}) == [] + + +class TestClassifySeverity: + def test_below_yellow_is_green(self) -> None: + assert _classify_severity(estimate=1.0, budget=10.0) == Severity.AMPLE + + def test_above_yellow_threshold(self) -> None: + assert _classify_severity(estimate=9.5, budget=10.0) == Severity.TIGHT + + def test_at_or_above_red_threshold(self) -> None: + assert _classify_severity(estimate=10.0, budget=10.0) == Severity.OVER + assert _classify_severity(estimate=12.0, budget=10.0) == Severity.OVER + + def test_zero_budget_returns_yellow(self) -> None: + assert _classify_severity(estimate=1.0, budget=0.0) == Severity.TIGHT + + +class TestVramForTransformer: + @pytest.fixture + def meta(self) -> ModelMeta: + return ModelMeta( + name="x", + total_params=100_000_000, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + + def test_full_finetune_is_larger_than_lora_is_larger_than_inference(self, meta: ModelMeta) -> None: + inference = _vram_for_transformer(meta, "inference") + lora = _vram_for_transformer(meta, "lora") + full = _vram_for_transformer(meta, "full-finetune") + assert inference < lora < full + + def test_inference_activations_are_smaller_than_training(self, meta: ModelMeta) -> None: + """Inference doesn't store per-layer outputs for backward — activation memory + should be many times smaller than training at the same batch_size.""" + train_total = _vram_for_transformer(meta, "full-finetune", batch_size=64, seq_len=128) + train_weights = _vram_for_transformer(meta, "full-finetune", batch_size=0) + inf_total = _vram_for_transformer(meta, "inference", batch_size=64, seq_len=128) + inf_weights = _vram_for_transformer(meta, "inference", batch_size=0) + train_acts = train_total - train_weights + inf_acts = inf_total - inf_weights + assert inf_acts > 0 + assert train_acts > inf_acts + # 12-layer model: training activations should be at least ~5x inference. + assert train_acts / inf_acts > 5 + + +def test_ram_scales_with_dataset_size() -> None: + meta = ModelMeta( + name="x", + total_params=100_000_000, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + small = _ram_for_module(meta, DatasetStats.placeholder(n_samples=100)) + big = _ram_for_module(meta, DatasetStats.placeholder(n_samples=10_000_000, avg_tokens=128)) + assert big > small + + +class TestRunPreflightFeatures: + def test_dump_modules_adds_disk_during_training(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 5}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.resource.disk_dump_gb > 0 + assert any("during training" in f.message for f in report.findings) + + def test_refit_after_increases_time(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 10}, + } + baseline = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + bumped = run_preflight(cfg, DatasetStats.placeholder(), _profile(), refit_after=True) + assert bumped.resource.time_hours > baseline.resource.time_hours + + def test_catboost_gpu_without_cuda_flags_config(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cpu")) + assert any(f.phase == "config" and "CatBoost" in f.message for f in report.findings) + + def test_catboost_gpu_with_cuda_is_silent(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cuda")) + assert not any(f.phase == "config" and "CatBoost" in f.message for f in report.findings) + + def test_offline_flips_low_confidence(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "any/model"}], + } + ], + } + ] + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.low_confidence is True + assert any("Heuristic fallback" in n for n in report.notes) + + def test_rare_classes_with_linear_scorer_flag_red(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "linear"}, + ], + } + ] + } + stats = DatasetStats( + n_samples=20, + n_classes=5, + avg_tokens=10, + rare_classes=["intent_a", "intent_b"], + ) + report = run_preflight(cfg, stats, _profile()) + assert any( + f.phase == "data" and "LogisticRegressionCV" in f.message and f.severity == Severity.OVER + for f in report.findings + ) + + def test_truncation_red_when_p95_dominates_max_length(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [{"model_name": "some/model"}], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=400) + report = run_preflight(cfg, stats, _profile()) + red = [f for f in report.findings if f.phase == "data" and f.severity == Severity.OVER] + assert red, "p95=400 > 1.5 * max_length=128 should be red" + + def test_truncation_yellow_when_p95_only_slightly_exceeds(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [{"model_name": "some/model"}], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=140) + report = run_preflight(cfg, stats, _profile()) + yellows = [ + f + for f in report.findings + if f.phase == "data" and f.severity == Severity.TIGHT and "truncation" in f.message.lower() + ] + assert yellows + + +class TestLinearCatboostFormulas: + """Cost surfaces for the classic (sklearn / catboost) scorers.""" + + def _embedder_node(self) -> dict[str, Any]: + return { + "node_type": "embedder", + "search_space": [ + { + "module_name": "sentence_transformer", + "embedder_config": [{"model_name": "sentence-transformers/all-MiniLM-L6-v2"}], + } + ], + } + + def test_linear_contributes_ram_and_time(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear", "max_iter": [200]}], + }, + ], + "hpo_config": {"n_trials": 5}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile()) + linear_drivers = [d for d in report.resource.drivers if d["module"] == "linear"] + assert len(linear_drivers) == 1 + assert report.resource.ram_gb > 0 + assert report.resource.time_hours > 0 + assert linear_drivers[0]["vram_gb"] == 0 # sklearn is CPU-only + + def test_logreg_cv_multiplier_dominates_multiclass_time(self) -> None: + """Multiclass linear uses LogisticRegressionCV (Cs*cv+1 ≈ 31 inner fits); + multilabel uses one LogReg per class (cv_multiplier=1). At equal n_classes, + multiclass must be much slower than the per-class multilabel path.""" + base = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear", "max_iter": [1000]}], + }, + ], + "hpo_config": {"n_trials": 1}, + } + multiclass = run_preflight( + base, + DatasetStats.placeholder(n_samples=100_000, n_classes=10, multilabel=False), + _profile(), + ) + multilabel = run_preflight( + base, + DatasetStats.placeholder(n_samples=100_000, n_classes=10, multilabel=True), + _profile(), + ) + # multiclass: 31 inner fits x 1 model; multilabel: 1 fit x n_classes=10 models. + # 31 > 10 => multiclass is the slower path. + assert multiclass.resource.time_hours > multilabel.resource.time_hours + + def test_catboost_contributes_ram_and_time_on_cpu(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "catboost", + "iterations": [1000], + "depth": [6], + } + ], + }, + ], + "hpo_config": {"n_trials": 3}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=8, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(accelerator="cpu")) + cb = next(d for d in report.resource.drivers if d["module"] == "catboost") + assert report.resource.ram_gb > 0 + assert report.resource.time_hours > 0 + assert cb["vram_gb"] == 0 + assert cb["mode"] == "catboost" + + def test_catboost_gpu_moves_cost_to_vram(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "catboost", + "iterations": [1000], + "depth": [6], + "task_type": "GPU", + } + ], + }, + ], + "hpo_config": {"n_trials": 2}, + } + stats = DatasetStats.placeholder(n_samples=100_000, n_classes=8, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(accelerator="cuda")) + cb = next(d for d in report.resource.drivers if d["module"] == "catboost") + assert report.resource.vram_gb > 0 + assert cb["ram_gb"] == 0 + assert cb["mode"] == "catboost-gpu" + + def test_linear_scales_with_n_samples(self) -> None: + cfg = { + "search_space": [ + self._embedder_node(), + { + "node_type": "scoring", + "search_space": [{"module_name": "linear"}], + }, + ], + } + small = run_preflight(cfg, DatasetStats.placeholder(n_samples=500), _profile()) + big = run_preflight(cfg, DatasetStats.placeholder(n_samples=500_000), _profile()) + assert big.resource.time_hours > small.resource.time_hours + assert big.resource.ram_gb > small.resource.ram_gb + + +class TestPerDriverBatchHint: + """Each transformer driver carries its own (batch_size, max_batch_size) for rendering.""" + + def _bert_cfg(self, model_name: str, batch_size: int) -> dict[str, Any]: + return { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": model_name}], + "num_train_epochs": [3], + "batch_size": [batch_size], + } + ], + } + ], + "hpo_config": {"n_trials": 1}, + } + + def test_driver_records_current_and_max_batch(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=64), + DatasetStats.placeholder(), + _profile(vram_gb=7.5), + ) + drivers = [d for d in report.resource.drivers if d["module"] == "bert"] + assert drivers + d = drivers[0] + assert d["batch_size"] == 64 + # vram_gb=7.5 against ~5.9 GB weights x 0.9 tight ratio -> little activation room, max < 64. + assert d["max_batch_size"] is not None + assert 0 < d["max_batch_size"] < 64 + + def test_max_batch_zero_when_weights_alone_overflow(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=64), + DatasetStats.placeholder(), + _profile(vram_gb=2.0), + ) + d = next(d for d in report.resource.drivers if d["module"] == "bert") + assert d["max_batch_size"] == 0 + + def test_max_batch_can_be_larger_than_current(self) -> None: + report = run_preflight( + self._bert_cfg("microsoft/deberta-v3-large", batch_size=32), + DatasetStats.placeholder(), + _profile(vram_gb=64.0), + ) + d = next(d for d in report.resource.drivers if d["module"] == "bert") + assert d["max_batch_size"] is not None + assert d["max_batch_size"] > 32 + + def test_multiple_drivers_carry_independent_max_batch(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"}, + {"model_name": "microsoft/deberta-v3-large"}, + ], + "num_train_epochs": [3], + "batch_size": [64], + } + ], + } + ], + "hpo_config": {"n_trials": 1}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(vram_gb=10.0)) + small = next(d for d in report.resource.drivers if "small" in d["model"]) + large = next(d for d in report.resource.drivers if "large" in d["model"]) + # The smaller model has more headroom -> larger max batch (or equal-cap when both saturate). + assert small["max_batch_size"] >= large["max_batch_size"] + + +class TestDumpModulesBounding: + """`dump_modules=True` writes one selected variant per node per trial — not + every candidate. The estimate must be bounded by sum-of-max-per-node x n_trials.""" + + def test_dump_disk_is_bounded_by_per_node_max_not_sum_of_all_variants(self) -> None: + # Two BERT candidates in the same node: only one is selected per trial. + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"}, + {"model_name": "microsoft/deberta-v3-large"}, + ], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 4}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + # Per-node max ~ deberta-v3-large weights (~350M x 4 ~ 1.3 GB). Two-candidate + # sum would be roughly doubled. Verify we used the per-node-max bound. + small_meta = _hub.resolve_model("microsoft/deberta-v3-small") + large_meta = _hub.resolve_model("microsoft/deberta-v3-large") + expected = large_meta.weights_gb * 4 + naive_sum = (small_meta.weights_gb + large_meta.weights_gb) * 4 + assert report.resource.disk_dump_gb == pytest.approx(expected, rel=0.01) + assert report.resource.disk_dump_gb < naive_sum + + def test_dump_disk_sums_across_nodes(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "embedder", + "search_space": [ + { + "module_name": "sentence_transformer", + "embedder_config": [{"model_name": "sentence-transformers/all-MiniLM-L6-v2"}], + } + ], + }, + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "microsoft/deberta-v3-small"}], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + }, + ], + "hpo_config": {"n_trials": 2}, + "logging_config": {"dump_modules": True}, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + embedder = _hub.resolve_model("sentence-transformers/all-MiniLM-L6-v2") + bert = _hub.resolve_model("microsoft/deberta-v3-small") + expected = (embedder.weights_gb + bert.weights_gb) * 2 + assert report.resource.disk_dump_gb == pytest.approx(expected, rel=0.01) diff --git a/tests/advisor/test_hardware_detection.py b/tests/advisor/test_hardware_detection.py new file mode 100644 index 000000000..d8131fb19 --- /dev/null +++ b/tests/advisor/test_hardware_detection.py @@ -0,0 +1,72 @@ +"""Hardware detection has to be safe on every machine — broken CUDA, no GPU, +no psutil. Verify the fallbacks work without raising. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from autointent._advisor._hardware import detect_hardware + + +def test_cpu_fallback_when_no_accelerator() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" + assert hw.vram_gb == 0.0 + assert hw.device_class == "cpu" + + +def test_cuda_branch_classifies_low_gpu() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(8.0, "NVIDIA RTX 3060"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "cuda" + assert hw.vram_gb == pytest.approx(8.0) + assert hw.device_class == "low-gpu" + + +def test_mps_budget_uses_ram_fraction() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_ram_gb", return_value=32.0), + patch( + "autointent._advisor._hardware._detect_mps", + side_effect=lambda ram, ratio: (ram * ratio, "Apple Silicon (arm64)"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "mps" + assert hw.vram_gb == pytest.approx(32.0 * 0.7) + assert any("MPS unified memory" in n for n in hw.notes) + + +def test_vram_budget_override_applies() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(24.0, "NVIDIA RTX 4090"), + ), + ): + hw = detect_hardware(vram_budget_gb=8.0) + assert hw.vram_gb == pytest.approx(8.0) + assert any("manual VRAM budget" in n for n in hw.notes) + + +def test_broken_cuda_returns_none_does_not_crash() -> None: + # _detect_cuda swallows torch quirks already; verify the wrapper holds. + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" diff --git a/tests/advisor/test_hub_heuristics.py b/tests/advisor/test_hub_heuristics.py new file mode 100644 index 000000000..c19018235 --- /dev/null +++ b/tests/advisor/test_hub_heuristics.py @@ -0,0 +1,66 @@ +"""Tests for the offline heuristic fallback in `_hub`. + +The advisor must produce a sensible estimate even when HF Hub is unreachable. +Without a per-name heuristic, every offline lookup collapses to a single +BERT-base-sized default — these tests pin that contract. +""" + +from __future__ import annotations + +import pytest + +from autointent._advisor import _hub + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.resolve_model.cache_clear() + # Force `_hub_metadata` to behave as if the live Hub were unreachable so + # resolve_model falls through to `_heuristic_metadata`. + monkeypatch.setattr(_hub, "_hub_metadata", lambda _name: None) + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + + +def test_offline_lookup_uses_bert_base_default() -> None: + """Every offline lookup returns the same BERT-base-sized fallback.""" + for name in ( + "microsoft/deberta-v3-large", + "sentence-transformers/all-MiniLM-L6-v2", + "totally-made-up/no-such-model", + ): + meta = _hub.resolve_model(name) + assert meta.confidence == "heuristic" + assert meta.total_params == _hub._DEFAULT_HEURISTIC_PARAMS + + +def test_weights_gb_matches_params_times_bytes() -> None: + meta = _hub.resolve_model("microsoft/deberta-v3-large") + expected_gb = meta.total_params * meta.weight_bytes_per_param / (1024**3) + assert meta.weights_gb == pytest.approx(expected_gb) + + +def test_local_path_returns_zero_disk() -> None: + meta = _hub.resolve_model("/tmp/local/path/to/model") + assert meta.total_file_bytes == 0 + assert meta.cached_locally is True + + +def test_disk_gb_falls_back_to_param_size_when_siblings_unknown() -> None: + meta = _hub.resolve_model("intfloat/multilingual-e5-large-instruct") + assert meta.disk_gb > 0 + assert meta.disk_gb == pytest.approx(meta.weights_gb, rel=0.01) + + +def test_resolve_is_memoized() -> None: + a = _hub.resolve_model("microsoft/deberta-v3-large") + b = _hub.resolve_model("microsoft/deberta-v3-large") + assert a is b + + +def test_metadata_fallback_uses_heuristic_when_hub_unreachable() -> None: + """End-to-end: resolve_model must return a usable ModelMeta even when + the live Hub is unreachable (autouse fixture forces offline).""" + meta = _hub.resolve_model("microsoft/deberta-v3-large") + assert meta.confidence == "heuristic" + assert meta.total_params > 0 + assert meta.disk_gb > 0 diff --git a/tests/advisor/test_render.py b/tests/advisor/test_render.py new file mode 100644 index 000000000..7a806c7f2 --- /dev/null +++ b/tests/advisor/test_render.py @@ -0,0 +1,170 @@ +"""Output rendering: text formatting and JSON serialization.""" + +from __future__ import annotations + +import json + +from autointent._advisor._render import _batch_hint, render_json, render_recommendation, render_text +from autointent._advisor._report import ( + DatasetStats, + PreflightReport, + ResourceEstimate, + Severity, +) + + +def _populated_report() -> PreflightReport: + r = PreflightReport( + preset_name="example", + hardware={ + "accelerator": "cuda", + "device_name": "RTX 3060", + "vram_gb": 8.0, + "ram_gb": 32.0, + "free_disk_gb": 100.0, + "device_class": "low-gpu", + }, + dataset={"n_samples": 500, "n_classes": 10, "avg_tokens": 30, "source": "placeholder"}, + resource=ResourceEstimate( + disk_download_gb=2.5, + disk_cached_gb=0.5, + ram_gb=1.0, + vram_gb=4.0, + time_hours=1.2, + drivers=[ + { + "node_type": "scoring", + "module": "bert", + "model": "x/y", + "mode": "full-finetune", + "vram_gb": 4.0, + "ram_gb": 1.0, + "time_hours": 1.2, + "confidence": "hub", + } + ], + ), + notes=["MPS unified memory note"], + ) + r.add("resource", Severity.TIGHT, "VRAM ~6 GB vs available 8 GB") + r.add("data", Severity.OVER, "rare classes blocked") + return r + + +class TestRenderText: + def test_contains_phase_blocks(self) -> None: + out = render_text(_populated_report()) + assert "Resource:" in out + assert "Data:" in out + # Config phase has no findings -> block omitted + assert "Config:" not in out + + def test_includes_drivers_block(self) -> None: + out = render_text(_populated_report()) + assert "Drivers of cost:" in out + assert "x/y" in out + + def test_verdict_reflects_headroom(self) -> None: + out = render_text(_populated_report()) + assert "Verdict: INFEASIBLE" in out + assert "headroom: over" in out + + def test_disclaimer_always_present(self) -> None: + out = render_text(_populated_report()) + assert "heuristic upper bounds" in out + + def test_low_confidence_tag_when_offline(self) -> None: + r = _populated_report() + r.low_confidence = True + out = render_text(r) + assert "low-confidence" in out + + def test_preset_name_in_title(self) -> None: + out = render_text(_populated_report()) + assert "Compute feasibility check — example" in out + + def test_empty_report_still_renders(self) -> None: + out = render_text(PreflightReport()) + assert "Compute feasibility check" in out + assert "Verdict: feasible" in out + + +class TestRenderJson: + def test_is_valid_json(self) -> None: + json.loads(render_json(_populated_report())) + + def test_findings_have_string_severity(self) -> None: + d = json.loads(render_json(_populated_report())) + for f in d["findings"]: + assert f["severity"] in {"ample", "tight", "over"} + + def test_headroom_and_feasibility_serialized(self) -> None: + d = json.loads(render_json(_populated_report())) + assert d["headroom"] == "over" + assert d["is_feasible"] is False + + def test_empty_report_serializes(self) -> None: + d = json.loads(render_json(PreflightReport())) + assert d["headroom"] == "ample" + assert d["is_feasible"] is True + + +class TestRenderRecommendation: + def _two_reports(self) -> list[tuple[str, PreflightReport]]: + a = PreflightReport(preset_name="a", resource=ResourceEstimate(vram_gb=2.0, time_hours=0.5)) + a.add("resource", Severity.AMPLE, "ok") + b = PreflightReport(preset_name="b", resource=ResourceEstimate(vram_gb=8.0, time_hours=4.0)) + b.add("resource", Severity.OVER, "too big") + return [("a", a), ("b", b)] + + def test_lists_chosen_preset_when_present(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "-> a" in out + + def test_handles_no_chosen(self) -> None: + out = render_recommendation(self._two_reports(), chosen=None) + assert "none of the bundled presets" in out + + def test_includes_all_presets_in_table(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "a " in out # preset name + assert "b " in out + + def test_shows_status_per_preset(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "feasible" in out + assert "infeasible" in out + + +class TestBatchHint: + """Per-driver batch cell rendered in the Drivers-of-cost table.""" + + def test_arrow_when_max_differs(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 32}) == "64 -> 32" + + def test_plain_when_max_equals_current(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 64}) == "64" + + def test_no_fit_label_when_max_zero(self) -> None: + assert _batch_hint({"batch_size": 64, "max_batch_size": 0}) == "64 (no fit)" + + def test_empty_when_no_batch(self) -> None: + assert _batch_hint({"batch_size": None, "max_batch_size": None}) == "" + + def test_increase_arrow(self) -> None: + assert _batch_hint({"batch_size": 32, "max_batch_size": 128}) == "32 -> 128" + + +def test_dataset_stats_in_text_block() -> None: + stats = DatasetStats.placeholder(n_samples=777, n_classes=4) + r = PreflightReport( + dataset={ + "n_samples": stats.n_samples, + "n_classes": stats.n_classes, + "avg_tokens": stats.avg_tokens, + "source": stats.source, + } + ) + out = render_text(r) + assert "777" in out + assert "n_classes=4" in out diff --git a/tests/advisor/test_report.py b/tests/advisor/test_report.py new file mode 100644 index 000000000..acb2b5bf8 --- /dev/null +++ b/tests/advisor/test_report.py @@ -0,0 +1,87 @@ +"""Unit tests for the report dataclasses.""" + +from __future__ import annotations + +import dataclasses + +import pytest + +from autointent._advisor._report import ( + DatasetStats, + Finding, + PreflightReport, + ResourceEstimate, + Severity, +) + + +class TestSeverityOrdering: + def test_headroom_on_empty_report_is_green(self) -> None: + assert PreflightReport().headroom == Severity.AMPLE + + def test_red_beats_yellow_beats_green(self) -> None: + r = PreflightReport() + r.add("resource", Severity.AMPLE, "ok") + r.add("data", Severity.TIGHT, "warn") + assert r.headroom == Severity.TIGHT + r.add("config", Severity.OVER, "fail") + assert r.headroom == Severity.OVER # type: ignore[comparison-overlap] + + def test_is_feasible_flips_on_any_red(self) -> None: + r = PreflightReport() + r.add("resource", Severity.TIGHT, "warn") + assert r.is_feasible is True + r.add("data", Severity.OVER, "fail") + assert r.is_feasible is False + + +class TestDatasetStatsPlaceholder: + def test_defaults_populate_p95_above_avg(self) -> None: + stats = DatasetStats.placeholder() + assert stats.n_samples == 1_000 + assert stats.p95_tokens is not None + assert stats.p95_tokens > stats.avg_tokens + assert stats.source == "placeholder" + + def test_overrides_propagate(self) -> None: + stats = DatasetStats.placeholder(n_samples=42, n_classes=3, avg_tokens=80, multilabel=True) + assert stats.n_samples == 42 + assert stats.n_classes == 3 + assert stats.avg_tokens == 80 + assert stats.multilabel is True + + +class TestResourceEstimate: + def test_total_disk_sums_download_and_dump(self) -> None: + e = ResourceEstimate(disk_download_gb=2.5, disk_dump_gb=4.0) + assert e.total_disk_gb == pytest.approx(6.5) + + def test_total_disk_ignores_cached(self) -> None: + e = ResourceEstimate(disk_download_gb=1.0, disk_cached_gb=100.0, disk_dump_gb=0.5) + assert e.total_disk_gb == pytest.approx(1.5) + + +class TestToDictSerialization: + def test_findings_round_trip_severity_as_string(self) -> None: + r = PreflightReport() + r.add("resource", Severity.OVER, "boom") + d = r.to_dict() + assert d["headroom"] == "over" + assert d["is_feasible"] is False + assert d["findings"] == [ + {"phase": "resource", "severity": "over", "message": "boom", "metric": None}, + ] + + def test_hardware_and_dataset_pass_through(self) -> None: + r = PreflightReport( + hardware={"accelerator": "cuda", "vram_gb": 8.0}, + dataset={"n_samples": 100, "n_classes": 5}, + ) + d = r.to_dict() + assert d["hardware"]["accelerator"] == "cuda" + assert d["dataset"]["n_samples"] == 100 + + def test_finding_is_frozen(self) -> None: + f = Finding(phase="resource", severity=Severity.AMPLE, message="ok") + with pytest.raises(dataclasses.FrozenInstanceError): + f.message = "changed" # type: ignore[misc]