diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index f3e8c45..fc60604 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -10,7 +10,7 @@ concurrency: env: KIND_VERSION: v0.32.0 - IMAGE: ghcr.io/converged-computing/fluence:latest + IMAGE: vanessa/fluence:test jobs: e2e: @@ -19,19 +19,20 @@ jobs: - name: Checkout uses: actions/checkout@v4 - #- name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 - # - name: Build fluence image - # uses: docker/build-push-action@v6 - # with: - # context: . - # file: ./Dockerfile - # push: false - # load: true - # tags: ${{ env.IMAGE }} - # cache-from: type=gha - # cache-to: type=gha,mode=max + - name: Build fluence image + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + push: false + load: true + tags: ${{ env.IMAGE }} + cache-from: type=gha + cache-to: type=gha,mode=max + - name: Create k8s Kind Cluster uses: helm/kind-action@v1.10.0 with: @@ -55,6 +56,12 @@ jobs: echo "=== Disk space after cleanup ===" df -h + - name: Load docker images + run: | + kind get clusters + cluster=$(kind get clusters) + kind load --name $cluster docker-image vanessa/fluence:test + - name: Deploy fluence (base) run: | kubectl apply -f deploy/fluence-test.yaml @@ -62,6 +69,7 @@ jobs: POD=$(kubectl -n kube-system get pods -l app=fluence -o name | head -1) kubectl -n kube-system exec "${POD#pod/}" -- ls /tmp/ kubectl -n kube-system logs "${POD#pod/}" + sleep 2 kubectl -n kube-system exec "${POD#pod/}" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{": cpu="}{.status.allocatable.cpu}{" mem="}{.status.allocatable.memory}{"\n"}{end}' @@ -80,6 +88,7 @@ jobs: sleep 1 done POD=$(kubectl -n kube-system get pods -l app=fluence -o name | head -1) + sleep 2 kubectl -n kube-system exec "${POD#pod/}" -- /bin/bash -c "cat /tmp/fluence-graph-*.json" - name: Wait for webhook diff --git a/Makefile b/Makefile index ed62c28..df5519f 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ FLUX_SCHED_ROOT ?= /opt/flux-sched IMG ?= ghcr.io/converged-computing/fluence:latest -TEST_IMG ?= ghcr.io/converged-computing/fluence:test +TEST_IMG ?= vanessa/fluence:test # cgo flags for the scheduler binary: flux-sched only. CGO_CFLAGS = -I$(FLUX_SCHED_ROOT) @@ -41,7 +41,7 @@ image: ## Build the scheduler container image .PHONY: test-image test-image: ## Build the scheduler container image - docker build -t $(TEST_IMG)-test . + docker build -t $(TEST_IMG) . docker push $(TEST_IMG) .PHONY: test-image-deploy diff --git a/README.md b/README.md index 50849dc..0068322 100644 --- a/README.md +++ b/README.md @@ -2,123 +2,270 @@ ![img/fluence.png](img/fluence.png) -🚧 **UNDER DEVELOPMENT** 🚧 not ready for production use! I rolled back features since the recorded demo, and am going to add them back with proper testing. I have not finished this yet, but anticipate later in the week of 6/16/2026. Thank you for your patience! -@vsoch - -A Kubernetes scheduler plugin that places **pod groups** (and individual pods) -by matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) -(flux-sched) resource graph built from the live cluster. - -This is an update from [flux-k8s](https://github.com/flux-framework/flux-k8s) -that uses the native PodGroup and optionally allows for scheduling -against arbitrary resources such as **quantum resources** modeled in the same graph. -I am also improving the design by not requiring a sidecar for fluence, and not -requiring the `kubernetes-sigs/scheduler-plugins` dependency. We use native Gang -scheduling provided by Kubernetes. - -For quantum resource modeling, we start from the prototype proven out in +🚧 **UNDER DEVELOPMENT** 🚧 Thank you for your patience! -@vsoch + +A Kubernetes scheduler plugin that places **pod groups** (and individual pods) by +matching them against a [Fluxion](https://github.com/flux-framework/flux-sched) +resource graph built from the live cluster. Beyond ordinary compute, it can model +**arbitrary virtual resources** — quantum backends today, anything with a count +and some attributes tomorrow — as first-class graph vertices that can be filtered +on and whose attributes are injected into the workload's environment. Nothing in +the design is quantum-specific: a resource `type` is an opaque string throughout. + +This updates [flux-k8s](https://github.com/flux-framework/flux-k8s) to use the +native Kubernetes PodGroup (Gang) API — no sidecar, and no +`kubernetes-sigs/scheduler-plugins` dependency. The quantum resource model is +proven out first in [fluxion-quantum](https://github.com/converged-computing/fluxion-quantum). -## How it works - -### Gang Scheduling - -Gang semantics (all-or-nothing) come from the native `PodGroup` API. Fluence is -responsible only for **placement**: - -1. **Discover** — on startup fluence lists cluster nodes and turns their - cpu/memory/gpu capacity into a Fluxion JGF resource graph - (`pkg/cluster` + `pkg/jgf`). If a resources config is provided (via - `FLUENCE_RESOURCES`), its entries (e.g. quantum backends) are injected as - `qpu`/`qubit` vertices. With no config the graph is classical-only. -2. **Match** — when the first pod of a group hits `PreFilter`, fluence builds a - Fluxion jobspec for the whole gang (`pkg/placement.JobspecForGroup`), asks the - matcher to allocate (`pkg/graph.FluxionGraph.MatchAllocateSpec`), and parses - the allocation into node and backend names (`PlacementFromAllocation`). -3. **Place** — `Filter` permits each pod only on its allocated node. (A - quantum-only pod allocates a `qpu` but no node — the backend is a remote API - any node can reach — so fluence imposes no node constraint in that case.) -4. **Hand off** — for a quantum pod, `PreBind` records the allocated backend on - the pod as the `fluence.flux-framework.org/backend` annotation. The mutating - webhook (installed with the base) injects a downward-API env so the container - reads it as `QRMI_BACKEND` with no boilerplate in the manifest. - -### Design Choices - -While Quantum resources are this first target, notably we should be able to support -any arbitrary resource in the graph. I decided that a pod can request a graph resource generically -e.g., `fluxion.flux-framework.org/` (like `.../qpu: "1"`) and that becomes a jobspec count -of ``. To support this, we deploy a **device plugin** that can advertise these virtual -types on every node. We need to do this because of the in-tree `NodeResourcesFit` endpoint. -If we do not have the device plugin, this call will not be satisfied. Note that -this device plugin will return True for any resources it sees added to the Fluxion resource graph, -but is not actually involved with scheduling. Fluxion does the real matching. +## How the pieces fit together ```console -nodes (kubectl get nodes) ──┐ - ├─► JGF resource graph ─► Fluxion match ─► node + backend placement -fluence-resources ConfigMap ┘ + resources.yaml --+ + +--------------+-------------------------------+ + v v v +scheduler device plugin webhook +(pkg/fluence) (advertises types) (injects env) + | + | build graph (pkg/cluster -> pkg/jgf) + | generate jobspecs (pkg/placement -> pkg/jobspec) + | match + cancel via Fluxion (pkg/graph, cgo) + | parse allocation (pkg/graph, pkg/placement) + v + pod bound to a node + backend/attrs stamped as annotations + | + v + webhook-injected env reads those annotations +``` + +A single `resources.yaml` is the source of truth, read by three independent +processes: the **scheduler** builds the graph and validates requests, the **device +plugin** advertises the requestable types as Kubernetes extended resources (so +pods requesting them pass admission), and the **webhook** computes the +environment-variable contract it injects into workloads. Note that the device +plugin can be removed from the design, assuming we do not need a `NodeResourcesFit` check. +I have kept it for now anticipating future cases where we have node-specific resources. +For now, the counts can be viewed as API quotas. + +For one pod group the scheduler generates one or more jobspecs, match-allocates +each against the graph (all-or-nothing), combines them into a placement, binds the +pod, and stamps the matched backend + attributes as annotations. The webhook has +already injected downward-API env vars that read those annotations, so the +workload sees which backend and attributes it got. + +## The resource model + +**Every allocatable thing in the graph is a `node`-typed vertex** -- physical +compute nodes and every level of every virtual resource alike. This is deliberate: +Fluxion's RFC 31 property constraints (the only attribute-based pruning the matcher +offers) apply **only** to `node` and `storage_node` vertices, so modeling a virtual +resource as a node is what makes it filterable. + +- **Physical nodes** carry `virtual=false`. +- **Virtual resources** carry `virtual=true`, a `class=` for the resource's + own type *and each descendant type in its subtree*, and their attributes. + +Property values are encoded into the property **key** (`virtual=true`, +`class=qpu`, `region=us-east-1`) because RFC 31 matching is key-presence only -- it +never compares the value half. A resource is selected by a `class=` +constraint, so any level is independently requestable (`class=qdevice`, +`class=qpu`, `class=qubit`). Descendant classes are propagated **up** so a global +constraint selecting a nested type isn't pruned at an ancestor node on the way +down; attributes are inherited **down** (a child gets its parent's attributes +unless it overrides or clears a key) so an attribute filter combined with a nested +class still reaches the target. + +Because a jobspec carries one global constraint, compute (`virtual=false`) and a +virtual resource (`virtual=true`) cannot be co-selected in one match -- one would +prune the other. A pod needing both produces **two** match-allocate calls, held +together all-or-nothing. + +--- + +## Components + +### `pkg/jgf` — JGF graph builder + +Builds the JSON Graph Format document Fluxion consumes. `AddRoot`/`AddChild` +create vertices with `Options` (size, unit, exclusivity, rank, properties). +`Options.NodeProperties` are RFC 31 resource properties (a nested `properties` +object the matcher prunes against); `Options.Properties` are descriptive metadata +only. `Options.Rank` sets a real execution rank — every `node` vertex needs one. + +### `pkg/cluster` — config schema and graph construction + +Parses the resource config and turns it (plus the live cluster nodes) into a graph. + +```yaml +resources: + - type: qdevice # opaque type; the resource's class + name: rigetti_cepheus # vertex name / backend identity + parent: cluster # where it attaches (default "cluster") + attributes: aws-east # inline map OR a reference into the registry below + with: # recursive children, same schema + - type: qpu + count: 1 + with: + - type: qubit + count: 80 + +attributes: # named attribute-set registry (reuse across backends) + aws-east: + region: us-east-1 + connectivity: all-to-all ``` -I am also choosing to keep credentials and qrmi interactions on the level of the application. -I am not comfortable with the design of an operator holding any kind of credential or being -responsible for managing calls with qrmi in a multi-tenant environment. Finally, since -there are (and will continue to be) a lot of environment variables that I do not want -to place on the user to define, we have a webhook to handle this. We can combine an annotation -added with the webhook with a PreBind call to define the annotation to orchestrate that. +`LoadResourcesConfig` parses YAML/JSON, resolves attributes (inline or by +reference) with downward inheritance, and errors on a missing `type`, an unknown +reference, or a non-empty config that defines no `resources:` (a schema mismatch, +so it fails loudly instead of building an empty graph). `BuildGraph` emits each +cluster node as a `virtual=false` node, then appends the resource trees: every +resource at every depth becomes a `virtual=true` node carrying its class set and +inherited attributes, on a real rank from a counter shared with the physical +nodes. `FluxionResourceNames` returns every requestable type (what the device +plugin advertises); `AttributeKeys` returns the union of attribute keys (the +webhook's env contract). + +### `pkg/jobspec` — jobspec types + +The Fluxion jobspec representation (`Jobspec`, `Resource`, `Task`) with YAML/JSON +round-tripping: a `resources` tree (a `slot` holding the request), an `attributes` +block (constraint + duration), and `tasks`. + +### `pkg/placement` — jobspec generation and allocation parsing + +`JobspecsForGroup` turns a pod group into the jobspecs to match: + +- Always one **compute** jobspec — a slot per pod holding core/memory/gpu, + constrained to `virtual=false`. A classical group produces only this (one match). +- One **device** jobspec per requested virtual type, requesting a `node` + constrained to `virtual=true` + `class=`. The class selects which virtual + node; the count comes from the request. +- Every pod gets at least one core (a device-only pod still needs a host). +- Every jobspec sets `duration: 0` — held until explicitly cancelled. +- A request for an unmodeled type is a hard error. + +Jobspecs are submitted as **JSON** (not YAML): the constraint parser requires +quoted property scalars, and JSON always quotes. + +`PlacementFromAllocation` classifies an allocation's node vertices by the +`virtual` marker — `virtual=false`/unmarked are compute bind targets, a +`virtual=true` node is the backend identity, and its `fluxion.flux-framework.org/` +properties are decomposed into attributes for env injection. This package also +owns the shared names: the `fluxion.flux-framework.org/` request prefix, the +`fluence.flux-framework.org/{backend,jobid,attr-*}` annotations, and the +`FLUXION_` env prefix. + +### `pkg/graph` — Fluxion binding (cgo) + +Wraps the cgo matcher (`FluxionGraph`: `Init`, `MatchAllocateSpec`, `Cancel`, +`Satisfy`) and parses allocations. Fluence uses the **jgf** match format: it emits +every allocated vertex with its properties regardless of type, so a virtual +allocation that bottoms out in nodes (no cores) still serializes — which rv1 +cannot. `NodesFromAllocation` returns node vertices with their properties for the +marker-based classification. This is the only cgo-dependent package, so it gates +local builds of everything importing it. + +### `pkg/fluence` — the scheduler plugin + +`New` lists the cluster, loads the config, builds and logs the graph, and inits the +matcher. + +- **PreFilter** runs per group: generate jobspecs, then `matchGroup` runs each as + an independently held allocation, **all-or-nothing** — any failure cancels the + successes, so the group never holds a partial set. +- **Filter** permits only the nodes Fluxion assigned. +- **PreBind** records durable state: the jobids on the owning object (PodGroup for + a gang, else the pod) for cancellation, and the matched backend + attributes as + annotations the webhook reads. +- **Cancellation** is informer-driven (no framework delete hook): deleting a + PodGroup or ungrouped pod frees its held allocations. The jobid annotation is the + durable source of truth; the graph (and allocations) are rebuilt on restart from + the same annotations. + +Gang semantics are delegated to the native PodGroup API; fluence only places. + +### `pkg/deviceplugin` — extended-resource advertisement + +Advertises each requestable type (`fluxion.flux-framework.org/`) as a +counted extended resource, so a pod requesting one passes `NodeResourcesFit` +admission. The real gating is Fluxion (and the backend's own limits); since a +virtual backend is reachable from any node, each type is advertised at a large +ceiling. Types come from the same config as the graph, so they can't drift. + +### `pkg/webhook` — environment injection + +A mutating webhook that surfaces scheduler-chosen values to a workload. Container +env is fixed at creation but the match happens after admission, so it injects +**downward-API** env vars whose values populate later from the annotations PreBind +writes. The injected set is a **config-derived contract**: `FLUXION_BACKEND` plus +one `FLUXION_` per attribute key across all backends — add an attribute and +its env var appears, no code change. The webhook self-manages TLS (generates a CA +and patches its own `caBundle`), so no cert-manager is needed. A vendor-agnostic +workload reads these normalized names regardless of which backend it matched. + +## Commands + +- `cmd/fluence` — the scheduler binary (stock kube-scheduler + the plugin). +- `cmd/deviceplugin` — the extended-resource DaemonSet. +- `cmd/webhook` — the env-injection webhook. +- `cmd/recovery-probe` — verifies allocation replay survives a graph rebuild + (what a restart does); see `make test-restore`. Note this was implemented but removed because the code in fluxion is only part of a PR branch, and I feel nervous about depending on it. + +## Configuration + +These are environment variables for fluence. + +| Env var | Read by | Meaning | +|---|---|---| +| `FLUENCE_RESOURCES` | scheduler, device plugin, webhook | path to `resources.yaml`; absent = classical-only | +| `FLUENCE_MATCH_POLICY` | scheduler | Fluxion match policy (default `first`) | +| `FLUENCE_RESOURCE_CAPACITY` | device plugin | per-node ceiling per type (default 1000) | + +## Observability + +The scheduler logs (prefix `[fluence]`) the full graph and known devices at +startup and, per match, the submitted jobspec, the raw Fluxion allocation, and the +parsed placement. The webhook logs (`[fluence-webhook]`) the env contract at +startup. Because live behavior (cgo matcher, real Kubernetes) can't be fully +unit-tested, these logs are the primary debugging surface. ## Build -The scheduler binary links flux-sched (the matcher). It does **not** link QRMI — -quantum job submission lives in a separate workload container +The scheduler links flux-sched (the matcher). It does **not** link QRMI or any +quantum backend — quantum job submission lives in a separate workload container ([qrmi-sampler](https://github.com/converged-computing/qrmi-sampler)), not here. ```bash # Inside the .devcontainer (flux-sched at /opt/flux-sched): -# builds bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook -make build +make build # bin/fluence (cgo+flux) + bin/fluence-deviceplugin + bin/fluence-webhook make test - -# Or build the container image (all three binaries): -make image +make image # or build the container image with all three binaries ``` ## Deploy -Create a development cluster on a Kubernetes release that supports native gang -scheduling, with the feature gates enabled: +Create a cluster on a Kubernetes release with native gang scheduling and the +feature gates the kind config enables (`GangScheduling`, `GenericWorkload`, the +`scheduling.k8s.io/v1alpha2` API group): ```bash kind create cluster --image kindest/node:v1.36.1 --config deploy/kind-config.yaml -``` - -(See [installing kind](https://kind.sigs.k8s.io/docs/user/quick-start#installing-from-release-binaries).) -The kind config turns on the `GangScheduling` and `GenericWorkload` feature gates -and the `scheduling.k8s.io/v1alpha2` API group on the apiserver and scheduler. In -the future these will likely be enabled by default. - -Load the image (built above) into the cluster: - -```bash kind load docker-image ghcr.io/converged-computing/fluence:latest ``` -### 1. Gang Scheduling - -Install the **base** scheduler (this is all you need for classical scheduling — -no device plugin, no quantum): +### 1. Gang scheduling (classical — all you need for non-quantum) ```bash -kubectl apply -f deploy/fluence.yaml +kubectl apply -f deploy/fluence.yaml # scheduler, RBAC, and the webhook ``` -This installs the scheduler, its RBAC, and the mutating webhook. Pods opt in with -`schedulerName: fluence`; a multi-pod gang adds a `scheduling.k8s.io/pod-group` -label (a single pod is treated as a group of one and needs no label). Test with a pod group: +Pods opt in with `schedulerName: fluence`; a multi-pod gang adds a +`scheduling.k8s.io/pod-group` label (a single pod is a group of one, no label +needed). ```bash kubectl apply -f examples/podgroup.yaml -kubectl get pods -o wide -kubectl get events --field-selector reason=Scheduled kubectl get podgroups.scheduling.k8s.io ``` ```console @@ -126,119 +273,81 @@ NAME POLICY WORKLOAD STATUS AGE training Gang Scheduled 15s ``` -And a quick cleanup. +Cleanup: ```bash kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' kubectl delete -f examples/podgroup.yaml ``` -### 2. Quantum +### 2. Quantum (the resources add-on) -Quantum needs the resources add-on, which supplies the `fluence-resources` -ConfigMap (the single source of truth for which backends exist) **and** the -device plugin that advertises them: +This supplies the `fluence-resources` ConfigMap (the source of truth for which +backends exist) and the device plugin that advertises them: ```bash kubectl apply -f deploy/fluence-resources.yaml kubectl apply -f deploy/device-plugin.yaml -# The scheduler reads its resources config at startup, so restart it to pick up -# the quantum vertices: +# The scheduler and webhook read the config at startup — restart to pick it up: kubectl rollout restart deployment/fluence -n kube-system +kubectl rollout restart deployment/fluence-webhook -n kube-system ``` -Confirm the device plugin advertised the resources on the nodes: +Confirm the resources are advertised: ```bash kubectl get nodes -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.status.allocatable}{"\n"}{end}' \ | grep fluxion.flux-framework.org ``` -```console -kind-control-plane {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -kind-worker {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -kind-worker2 {"cpu":"16","ephemeral-storage":"982292956Ki","fluxion.flux-framework.org/qpu":"1k","fluxion.flux-framework.org/qubit":"1k","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"61400748Ki","pods":"110"} -``` -Create the IBM credentials the **workload** uses to submit (in the namespace -where the workload runs — the scheduler itself never needs them): +Create the IBM credentials the **workload** uses to submit (the scheduler never +needs them): ```bash -# If you don't have this yet -curl -fsSL https://clis.cloud.ibm.com/install/linux | sudo sh ibmcloud login --apikey -# 12 for us-east -``` -```bash export IBM_CLOUD_TOKEN= -export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[] | {name: .name, crn: .crn}' | jq -r .crn) +export IBM_CLOUD_CRN=$(ibmcloud resource service-instances --service-name quantum-computing --output json | jq -r '.[0].crn') +kubectl create secret generic ibm-quantum -n default \ + --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" ``` -```bash -kubectl create secret generic ibm-quantum -n default --from-literal=token="$IBM_CLOUD_TOKEN" --from-literal=crn="$IBM_CLOUD_CRN" -``` - -Run a single quantum pod. It just requests `fluxion.flux-framework.org/qpu` — no -group, and no hard-coded backend (the webhook + PreBind supply `QRMI_BACKEND`): +Run a single quantum pod — it just requests `fluxion.flux-framework.org/qpu`, with +no hard-coded backend (the webhook + PreBind supply `FLUXION_BACKEND`): ```bash kubectl apply -f examples/quantum-pod.yaml -kubectl get pod sampler -o wide -# fluence's chosen backend, injected as an environment variable: +# fluence's chosen backend, also injected as $FLUXION_BACKEND in the container: kubectl get pod sampler -o jsonpath='{.metadata.annotations.fluence\.flux-framework\.org/backend}{"\n"}' kubectl logs sampler ``` ```console -kubectl logs sampler -f 2026/06/06 19:04:38 submitting sampler job to ibm_marrakesh -{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}} +{"results": [ ... samples ... ]} 2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh ``` -Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources. - -```bash -kubectl logs -n kube-system fluence-75d6848778-g4lh6 -... -I0610 18:33:05.843325 1 eventhandlers.go:443] "Delete event for scheduled pod" pod="default/sampler" - 🌀 Cancel jobid: 1 -(env) (base) vanessa@vanessa-ThinkPad-P14s-Gen-4:~/Desktop/Code/fluence$ kubectl get pods -NAME READY STATUS RESTARTS AGE -sampler 0/1 Completed 0 24s -``` - -### A note on deletion +When the pod completes, fluence cancels the Fluxion allocation, freeing the +resources (visible in the scheduler logs as `Cancel jobid: N`). Note that I lost access to +my IBM account so this has not been tested live since mid June. -When developing/debugging, a PodGroup (or its pods) can hang on delete because of -finalizers (the workload controller may not be running). Our plugin is designed to handle this, -and normally it just takes a little time to finish. If you are impatient, you can do: +Submission is **not** done by the scheduler — the workload container holds the +user's credentials and submits via qrmi-go. Fluence only schedules and hands off +the backend. (When we control local quantum devices this will change.) -```bash -kubectl patch podgroup training -n default --type=merge -p '{"metadata":{"finalizers":null}}' -``` - -### Restoring State - -The plugin is designed to be able to come up and restore state, meaning we can read in existing groups -and repopulate the graph. We do that by way of annotating each group with the resources "R" that fluxion returns on a match, -and then in the case of a restart, we re-populate the graph using this metadata. Here is how to test that. - -```bash -make test-restore -``` +### Notes -Importantly, submission is **not** done by the scheduler — the workload container holds the -user's credentials and submits via qrmi-go (job mode on the IBM open plan; see -fluxion-quantum for that story). Fluence only schedules and hands off the backend. -When we actually have control of local quantum devices this will be different. +- **Deletion hangs.** A PodGroup can hang on delete via finalizers if the workload + controller isn't running; clear them with the `kubectl patch` above. +- **State restore.** On restart the plugin repopulates the graph from each group's + jobid annotations and re-holds the allocations: `make test-restore`. ## License -HPCIC DevTools is distributed under the terms of the MIT license. -All new contributions must be made under this license. +Distributed under the MIT license; all contributions must be made under it. -See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE) for details. +See [LICENSE](LICENSE), [COPYRIGHT](COPYRIGHT), and [NOTICE](NOTICE). SPDX-License-Identifier: MIT -LLNL-CODE-842614 +LLNL-CODE-842614 \ No newline at end of file diff --git a/cmd/deviceplugin/main.go b/cmd/deviceplugin/main.go index 8e88e6b..29cb55f 100644 --- a/cmd/deviceplugin/main.go +++ b/cmd/deviceplugin/main.go @@ -40,11 +40,11 @@ func main() { var names []string if data, err := os.ReadFile(cfgPath); err == nil { - qc, perr := cluster.LoadQuantumConfig(data) + rc, perr := cluster.LoadResourcesConfig(data) if perr != nil { log.Fatalf("parse resources config %s: %v", cfgPath, perr) } - names = cluster.FluxionResourceNames(qc.Backends) + names = cluster.FluxionResourceNames(rc.Resources) log.Printf("derived %d resource(s) from %s: %v", len(names), cfgPath, names) } else { log.Printf("no resources config at %s (%v); advertising nothing", cfgPath, err) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index bc5f816..20fac0d 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -17,6 +17,7 @@ import ( "os" "time" + "github.com/converged-computing/fluence/pkg/cluster" "github.com/converged-computing/fluence/pkg/webhook" "k8s.io/client-go/kubernetes" @@ -66,8 +67,27 @@ func main() { cancel() log.Printf("patched caBundle on MutatingWebhookConfiguration %q", cfgName) + // The env contract is the union of attribute keys across the configured + // backends (plus FLUXION_BACKEND), so the set of injected env vars tracks the + // config automatically. Loaded from the same FLUENCE_RESOURCES the scheduler + // and device plugin use; absent/unset means just FLUXION_BACKEND. + var attrKeys []string + if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { + if data, rerr := os.ReadFile(path); rerr == nil { + rc, perr := cluster.LoadResourcesConfig(data) + if perr != nil { + log.Fatalf("parse resources config %s: %v", path, perr) + } + attrKeys = cluster.AttributeKeys(rc.Resources) + } else { + log.Printf("no resources config at %s (%v); injecting FLUXION_BACKEND only", path, rerr) + } + } + mutator := &webhook.Mutator{AttributeKeys: attrKeys} + log.Printf("[fluence-webhook] env contract injected into fluxion pods: %v", mutator.EnvVarNames()) + mux := http.NewServeMux() - mux.HandleFunc("/mutate", webhook.Handler) + mux.HandleFunc("/mutate", mutator.Handler) mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) srv := &http.Server{ diff --git a/deploy/fluence-resources-test.yaml b/deploy/fluence-resources-test.yaml index d4fed97..b584bfb 100644 --- a/deploy/fluence-resources-test.yaml +++ b/deploy/fluence-resources-test.yaml @@ -22,13 +22,30 @@ metadata: namespace: kube-system data: resources.yaml: | - backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 + # New generic schema: each backend is a resource tree attached under the + # cluster. Modeled as a qdevice carrying a requestable qpu child; attributes + # become filterable graph properties AND injected env (FLUXION_). + resources: + - type: qdevice + name: ibm_fez + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + - type: qdevice + name: ibm_marrakesh + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + attributes: + ibm: vendor: ibm qrmi_type: qiskit-runtime-service --- @@ -53,7 +70,7 @@ spec: - operator: Exists # run on every node, including tainted/control-plane containers: - name: deviceplugin - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test command: ["/bin/fluence-deviceplugin"] env: - name: FLUENCE_RESOURCES @@ -73,4 +90,4 @@ spec: path: /var/lib/kubelet/device-plugins - name: resources configMap: - name: fluence-resources + name: fluence-resources \ No newline at end of file diff --git a/deploy/fluence-resources.yaml b/deploy/fluence-resources.yaml index 973d586..c11fea3 100644 --- a/deploy/fluence-resources.yaml +++ b/deploy/fluence-resources.yaml @@ -22,12 +22,29 @@ metadata: namespace: kube-system data: resources.yaml: | - backends: - - name: ibm_fez - num_qubits: 156 - vendor: ibm - qrmi_type: qiskit-runtime-service - - name: ibm_marrakesh - num_qubits: 156 + # New generic schema: each backend is a resource tree attached under the + # cluster. Modeled as a qdevice carrying a requestable qpu child; attributes + # become filterable graph properties AND injected env (FLUXION_). + resources: + - type: qdevice + name: ibm_fez + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + - type: qdevice + name: ibm_marrakesh + attributes: ibm + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 156 + attributes: + ibm: vendor: ibm qrmi_type: qiskit-runtime-service diff --git a/deploy/fluence-test.yaml b/deploy/fluence-test.yaml index 525713e..0eb6f8a 100644 --- a/deploy/fluence-test.yaml +++ b/deploy/fluence-test.yaml @@ -124,9 +124,9 @@ spec: serviceAccountName: fluence containers: - name: fluence - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test # Allows for kind load - # imagePullPolicy: Never + imagePullPolicy: Never command: - /bin/fluence - --config=/etc/fluence/scheduler-config.yaml @@ -173,9 +173,9 @@ spec: serviceAccountName: fluence containers: - name: webhook - image: ghcr.io/converged-computing/fluence:test + image: vanessa/fluence:test # Allows for kind load - # imagePullPolicy: Never + imagePullPolicy: Never command: ["/bin/fluence-webhook"] ports: - containerPort: 8443 diff --git a/examples/test/e2e/quantum-pod-mock-2.yaml b/examples/test/e2e/quantum-pod-mock-2.yaml index 5286fe0..e8c443b 100644 --- a/examples/test/e2e/quantum-pod-mock-2.yaml +++ b/examples/test/e2e/quantum-pod-mock-2.yaml @@ -11,7 +11,7 @@ spec: containers: - name: sampler image: busybox - command: ["sh", "-c", "echo BACKEND=$QRMI_BACKEND; sleep 3600"] + command: ["sh", "-c", "echo BACKEND=$FLUXION_BACKEND; sleep 3600"] resources: requests: fluxion.flux-framework.org/qpu: "1" diff --git a/examples/test/e2e/quantum-pod-mock.yaml b/examples/test/e2e/quantum-pod-mock.yaml index 92849dd..3290611 100644 --- a/examples/test/e2e/quantum-pod-mock.yaml +++ b/examples/test/e2e/quantum-pod-mock.yaml @@ -13,7 +13,7 @@ spec: - name: sampler image: busybox # Print the injected backend, then idle so we can assert on it. - command: ["sh", "-c", "echo BACKEND=$QRMI_BACKEND; sleep 3600"] + command: ["sh", "-c", "echo BACKEND=$FLUXION_BACKEND; sleep 3600"] resources: requests: fluxion.flux-framework.org/qpu: "1" diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 6bb572a..56a845b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1,45 +1,25 @@ // Package cluster builds a Fluxion resource graph from the live Kubernetes -// cluster. Traditional compute (cpu/memory/gpu) is discovered from node -// capacity; virtual quantum resources are injected from configuration so the -// same graph can carry both classical and quantum vertices. +// cluster. Physical compute (cpu/memory/gpu) is discovered from node capacity; +// virtual resources (e.g. quantum backends, but nothing here is quantum-specific) +// are injected from a generic resource-tree configuration so the same graph can +// carry both physical and virtual vertices. +// +// Model: every allocatable thing is a "node" vertex. Physical nodes carry the +// property virtual=false; configured virtual resources carry virtual=true plus +// their configured attributes as RFC 31 properties. A pod's jobspec constrains +// virtual=false for the compute it lands on and virtual=true for any virtual +// device it requests, so the two are matched from disjoint vertex sets out of a +// single shared rank space. package cluster import ( - "fmt" - "sort" - "github.com/converged-computing/fluence/pkg/jgf" - "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/yaml" ) // DefaultGPUResource is the resource name GPUs are advertised under. const DefaultGPUResource = "nvidia.com/gpu" -// QuantumBackend describes a virtual quantum resource to model in the graph. -// The Name becomes the qpu vertex name (and the QRMI backend the job runs on). -type QuantumBackend struct { - Name string `json:"name"` - NumQubits int `json:"num_qubits,omitempty"` - Vendor string `json:"vendor,omitempty"` - QRMIType string `json:"qrmi_type,omitempty"` -} - -// QuantumConfig is the on-disk config that adds quantum resources to the graph. -type QuantumConfig struct { - Backends []QuantumBackend `json:"backends"` -} - -// LoadQuantumConfig reads a YAML or JSON list of quantum backends. -func LoadQuantumConfig(data []byte) (*QuantumConfig, error) { - var c QuantumConfig - if err := yaml.Unmarshal(data, &c); err != nil { - return nil, fmt.Errorf("parse quantum config: %w", err) - } - return &c, nil -} - // Options configures graph construction. type Options struct { // ClusterName is the root vertex name (default "cluster"). @@ -47,14 +27,20 @@ type Options struct { // GPUResource is the resource name GPUs are advertised under // (default DefaultGPUResource). GPUResource corev1.ResourceName - // Quantum backends to inject under a qgateway. - Quantum []QuantumBackend + // Resources are the configured virtual resource trees to inject. + Resources []Resource // IncludeUnschedulable includes cordoned nodes (default false). IncludeUnschedulable bool } -// BuildGraph turns cluster nodes (plus any configured quantum backends) into a +// BuildGraph turns cluster nodes (plus any configured virtual resources) into a // Fluxion JGF resource graph, returned as JSON ready for FluxionGraph.Init. +// +// Physical nodes are built first, then the configured resource trees are +// appended under their parent vertices. A single contiguous rank counter spans +// physical and virtual node vertices: every node-typed vertex gets a real rank +// (the rv1 writer requires it), and the virtual=true/false property keeps the +// two sets apart at match time. func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { b := jgf.NewBuilder() @@ -69,15 +55,35 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: clusterName}) + // rank is the shared, contiguous execution-rank counter across all node + // vertices (physical first, then virtual). + var rank int64 + + // parents maps a vertex name to its builder handle so a configured resource + // tree can attach under a named parent (default the cluster root). + parents := map[string]*jgf.Vertex{clusterName: cluster} + for i := range nodes { n := &nodes[i] if n.Spec.Unschedulable && !opts.IncludeUnschedulable { continue } + // Control-plane nodes are typically tainted (NoSchedule) rather than + // cordoned, so the Unschedulable check above does not catch them. Skip by + // taint, which is name- and type-independent (unlike matching the node + // name), unless the caller opts to include unschedulable nodes. + if isControlPlane(n) && !opts.IncludeUnschedulable { + continue + } + r := rank + rank++ nodeV := b.AddChild(cluster, "node", "node", jgf.Options{ - Name: n.Name, - Properties: map[string]any{"hostname": n.Name}, + Name: n.Name, + Rank: &r, + Properties: map[string]any{"hostname": n.Name}, + NodeProperties: map[string]string{ComposeProperty(VirtualProperty, "false"): ""}, }) + parents[n.Name] = nodeV if cpu := count(n, corev1.ResourceCPU); cpu > 0 { b.AddChild(nodeV, "core", "core", jgf.Options{Size: cpu}) @@ -90,64 +96,28 @@ func BuildGraph(nodes []corev1.Node, opts Options) ([]byte, error) { } } - if len(opts.Quantum) > 0 { - AddQuantum(b, cluster, opts.Quantum) + if err := appendResources(b, parents, clusterName, opts.Resources, &rank); err != nil { + return nil, err } return b.JSON() } -// FluxionResourceNames returns the distinct extended-resource names a device -// plugin should advertise for a set of quantum backends. It uses the SAME -// type-derivation rule as AddQuantum — each backend is a `qpu`, and a backend -// with num_qubits > 0 contributes `qubit` — so the device plugin's advertised -// resources and the graph's resource types are derived from one config and -// cannot drift. Names are prefixed with placement.FluxionResourcePrefix so they -// match what the scheduler strips off a pod request. -func FluxionResourceNames(backends []QuantumBackend) []string { - types := map[string]bool{} - if len(backends) > 0 { - types["qpu"] = true - } - for _, b := range backends { - if b.NumQubits > 0 { - types["qubit"] = true - } - } - names := make([]string, 0, len(types)) - for t := range types { - names = append(names, placement.FluxionResourcePrefix+t) - } - sort.Strings(names) - return names +// controlPlaneTaints are the well-known taint keys Kubernetes places on +// control-plane nodes. They are tainted NoSchedule rather than cordoned, so a +// taint check (not an Unschedulable check) is what excludes them. +var controlPlaneTaints = map[string]bool{ + "node-role.kubernetes.io/control-plane": true, + "node-role.kubernetes.io/master": true, } -// AddQuantum injects a qgateway under the cluster with one qpu vertex per -// backend. Exposed so a graph built elsewhere can be augmented the same way. -func AddQuantum(b *jgf.Builder, cluster *jgf.Vertex, backends []QuantumBackend) { - gw := b.AddChild(cluster, "qgateway", "qgateway", jgf.Options{ - Properties: map[string]any{"vendor": "ibm"}, - }) - for _, be := range backends { - props := map[string]any{"qrmi_type": orDefault(be.QRMIType, "qiskit-runtime-service")} - if be.NumQubits > 0 { - props["num_qubits"] = be.NumQubits - } - if be.Vendor != "" { - props["vendor"] = be.Vendor - } - qpu := b.AddChild(gw, "qpu", "qpu", jgf.Options{ - Name: be.Name, - Exclusive: true, - Properties: props, - }) - // Model qubits as a counted child so a request for N qubits matches a - // backend with at least that many (Fluxion count matching is >=). This - // is how the numeric "at least N qubits" ask is expressed without a - // numeric constraint (RFC 31 properties are boolean tags, not >=). - if be.NumQubits > 0 { - b.AddChild(qpu, "qubit", "qubit", jgf.Options{Size: int64(be.NumQubits)}) +// isControlPlane reports whether a node carries a control-plane taint. +func isControlPlane(n *corev1.Node) bool { + for i := range n.Spec.Taints { + if controlPlaneTaints[n.Spec.Taints[i].Key] { + return true } } + return false } // count reads an integer resource count, preferring allocatable over capacity. @@ -172,10 +142,3 @@ func memoryMB(n *corev1.Node) int64 { } return q.Value() / (1024 * 1024) } - -func orDefault(s, def string) string { - if s == "" { - return def - } - return s -} \ No newline at end of file diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go new file mode 100644 index 0000000..26a8a99 --- /dev/null +++ b/pkg/cluster/cluster_test.go @@ -0,0 +1,40 @@ +package cluster + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" +) + +// isControlPlane detects control-plane nodes by taint, independent of node name +// or type, and only for the well-known control-plane/master taint keys. +func TestIsControlPlane(t *testing.T) { + cp := &corev1.Node{} + cp.Spec.Taints = []corev1.Taint{ + {Key: "node-role.kubernetes.io/control-plane", Effect: corev1.TaintEffectNoSchedule}, + } + if !isControlPlane(cp) { + t.Error("expected control-plane node (control-plane taint) to be detected") + } + + master := &corev1.Node{} + master.Spec.Taints = []corev1.Taint{ + {Key: "node-role.kubernetes.io/master", Effect: corev1.TaintEffectNoSchedule}, + } + if !isControlPlane(master) { + t.Error("expected control-plane node (master taint) to be detected") + } + + worker := &corev1.Node{} + worker.Spec.Taints = []corev1.Taint{ + {Key: "some/other-taint", Effect: corev1.TaintEffectNoSchedule}, + } + if isControlPlane(worker) { + t.Error("worker with an unrelated taint should not be treated as control-plane") + } + + plain := &corev1.Node{} + if isControlPlane(plain) { + t.Error("node with no taints should not be control-plane") + } +} diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go new file mode 100644 index 0000000..9338a2a --- /dev/null +++ b/pkg/cluster/resources.go @@ -0,0 +1,242 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "strings" + + "sigs.k8s.io/yaml" +) + +// This file defines the generic resource-tree configuration that fluence reads +// to inject virtual resources into the Fluxion graph. It is deliberately free +// of any quantum-specific assumptions: a Resource has an arbitrary string type +// and may nest children, so any shape (a qdevice over qpus over qubits, a +// license server, an FPGA pool, ...) is expressible without per-type code. +// +// Two top-level keys: +// +// resources: a forest of resource trees to add to the graph. Each tree is +// attached under a named parent vertex (default "cluster"), so the +// virtual subtree sits alongside the physical "rack" tree without +// being linked to any specific physical node (today). A future +// design could set parent to a node name to link them. +// attributes: a registry of named attribute sets. A Resource's Attributes may +// be given inline (a map) or by reference (a string naming an entry +// here), so a shared set (e.g. a region/connectivity profile) is +// defined once and reused. +// +// Every attribute becomes both a queryable/prunable RFC 31 graph property and an +// injected environment value, so what a user can constrain on, they can also +// read back. (Graph/jobspec wiring lives in later pieces; this file only parses +// and validates.) + +// DefaultParent is the vertex a resource tree attaches under when none is given. +const DefaultParent = "cluster" + +// Resource is one vertex in a configured resource tree. Type is the Fluxion +// resource type (any string); Count is the size at this vertex; With are child +// resources (recursive). Attributes are resolved to a concrete map after load. +type Resource struct { + // Type is the Fluxion vertex type (e.g. "qdevice", "qpu", "qubit"). Required. + Type string `json:"type"` + // Name is an explicit vertex name (e.g. a backend id like "rigetti_cepheus"). + // Optional; the builder derives one from type+index when empty. + Name string `json:"name,omitempty"` + // Count is the resource quantity at this vertex (maps to the graph vertex + // size). Defaults to 1 when zero. + Count int64 `json:"count,omitempty"` + // Parent is the vertex this tree attaches under. Only meaningful on a + // top-level resource (children attach under their parent in With). Defaults + // to DefaultParent ("cluster"). + Parent string `json:"parent,omitempty"` + // Attributes is either an inline map (object) or a reference (string) into + // the top-level attributes registry. Resolved into ResolvedAttributes at + // load time; do not read this field directly afterward. + Attributes AttributeSpec `json:"attributes,omitempty"` + // With are child resources contained by this one (recursive). + With []Resource `json:"with,omitempty"` + + // ResolvedAttributes is the concrete attribute map after reference + // resolution. Populated by LoadResourcesConfig; nil if none. + ResolvedAttributes map[string]string `json:"-"` +} + +// AttributeSpec is a polymorphic attributes field: either a reference (a string +// naming a registry entry) or an inline map. Exactly one form is used. +type AttributeSpec struct { + // Ref is the registry key when attributes were given as a string. + Ref string + // Inline is the attribute map when attributes were given as an object. + Inline map[string]string +} + +// UnmarshalJSON accepts either a JSON string (reference) or object (inline map). +// (sigs.k8s.io/yaml converts YAML to JSON first, so this covers YAML too.) +func (a *AttributeSpec) UnmarshalJSON(data []byte) error { + if len(data) == 0 || string(data) == "null" { + return nil + } + switch data[0] { + case '"': + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + a.Ref = s + return nil + case '{': + m := map[string]string{} + if err := json.Unmarshal(data, &m); err != nil { + return err + } + a.Inline = m + return nil + default: + return fmt.Errorf("attributes must be a string reference or a map, got: %s", data) + } +} + +// empty reports whether no attributes were specified. +func (a AttributeSpec) empty() bool { + return a.Ref == "" && len(a.Inline) == 0 +} + +// ResourcesConfig is the on-disk configuration: a forest of resource trees plus +// a registry of named attribute sets they may reference. +type ResourcesConfig struct { + Resources []Resource `json:"resources"` + Attributes map[string]map[string]string `json:"attributes,omitempty"` +} + +// LoadResourcesConfig parses a YAML or JSON resources configuration and resolves +// every resource's attributes (inline or by reference) into a concrete map. A +// reference to an unknown attribute set is a hard error, as is a resource with +// no type. +func LoadResourcesConfig(data []byte) (*ResourcesConfig, error) { + var c ResourcesConfig + if err := yaml.Unmarshal(data, &c); err != nil { + return nil, fmt.Errorf("parse resources config: %w", err) + } + // Guard against a silent empty graph: if the file has real content (not just + // whitespace/comments) but produced zero resources, the schema almost + // certainly doesn't match (e.g. a top-level key other than "resources:", like + // an older "backends:" layout). Building a classical-only graph silently in + // that case hides the misconfiguration — the configured devices simply never + // appear in the graph. A genuinely empty file (an optional configmap not yet + // populated) is fine and stays classical-only. + if len(c.Resources) == 0 && hasContent(data) { + return nil, fmt.Errorf( + "resources config has content but defines no resources: " + + "expected a top-level 'resources:' list (check the schema)") + } + for i := range c.Resources { + if err := resolveResource(&c.Resources[i], c.Attributes, true); err != nil { + return nil, err + } + } + return &c, nil +} + +// hasContent reports whether data has any non-comment, non-whitespace content, +// so a truly empty config (classical-only) is distinguished from a malformed or +// schema-mismatched one. +func hasContent(data []byte) bool { + for _, line := range strings.Split(string(data), "\n") { + t := strings.TrimSpace(line) + if t == "" || strings.HasPrefix(t, "#") { + continue + } + return true + } + return false +} + +// resolveResource validates a resource and resolves its attributes, recursing +// into children. topLevel resources get a default parent. +func resolveResource(r *Resource, registry map[string]map[string]string, topLevel bool) error { + return resolveResourceInherited(r, registry, topLevel, nil) +} + +// resolveResourceInherited resolves a resource's attributes and threads them +// down to children. A child inherits every attribute of its parent, then applies +// its own: a key the child also sets overrides the inherited value, and a key the +// child sets to the empty string clears it (removed from the resolved set). This +// makes a constraint that selects a nested node by class AND filters on an +// attribute reachable — every node on the path (parent down to the target) +// carries the attribute, so none is pruned mid-descent. +func resolveResourceInherited( + r *Resource, + registry map[string]map[string]string, + topLevel bool, + inherited map[string]string, +) error { + if r.Type == "" { + return fmt.Errorf("resource is missing required field 'type'") + } + if topLevel && r.Parent == "" { + r.Parent = DefaultParent + } + + // own holds this resource's explicitly-set attributes (before inheritance). + var own map[string]string + if !r.Attributes.empty() { + resolved, err := resolveAttributes(r.Attributes, registry, r.Type) + if err != nil { + return err + } + own = resolved + } + + // Merge: start from inherited, apply own (override), honor explicit-clear. + merged := make(map[string]string, len(inherited)+len(own)) + for k, v := range inherited { + merged[k] = v + } + for k, v := range own { + if v == "" { + delete(merged, k) // explicit clear + continue + } + merged[k] = v + } + if len(merged) > 0 { + r.ResolvedAttributes = merged + } else { + r.ResolvedAttributes = nil + } + + for i := range r.With { + if err := resolveResourceInherited(&r.With[i], registry, false, merged); err != nil { + return err + } + } + return nil +} + +// resolveAttributes turns an AttributeSpec into a concrete map: inline is used +// directly; a reference is looked up in the registry (error if absent). +func resolveAttributes( + spec AttributeSpec, + registry map[string]map[string]string, + resourceType string, +) (map[string]string, error) { + if spec.Ref != "" { + set, ok := registry[spec.Ref] + if !ok { + return nil, fmt.Errorf( + "resource %q references unknown attribute set %q", resourceType, spec.Ref) + } + // Copy so callers can't mutate the shared registry entry. + out := make(map[string]string, len(set)) + for k, v := range set { + out[k] = v + } + return out, nil + } + out := make(map[string]string, len(spec.Inline)) + for k, v := range spec.Inline { + out[k] = v + } + return out, nil +} diff --git a/pkg/cluster/resources_graph.go b/pkg/cluster/resources_graph.go new file mode 100644 index 0000000..d5a5e25 --- /dev/null +++ b/pkg/cluster/resources_graph.go @@ -0,0 +1,197 @@ +package cluster + +import ( + "fmt" + "sort" + + "github.com/converged-computing/fluence/pkg/jgf" + "github.com/converged-computing/fluence/pkg/placement" +) + +// Property keys fluence stamps on graph vertices. VirtualProperty marks whether +// a node vertex is a configured virtual resource (true) or a physical compute +// node (false); it is the discriminator the jobspec constrains on and that +// placement uses to tell a bind target from a backend. ClassProperty preserves +// the configured resource type when a virtual resource is modeled as a node, so +// the original type (e.g. "qdevice") is not lost. +const ( + VirtualProperty = "virtual" + ClassProperty = "class" +) + +// ComposeProperty encodes a key/value as the single "key=value" string that +// Fluxion stores as a property key. RFC 31 PropertyConstraint matching is +// key-presence only — it never compares the value half of a vertex property — +// so to filter on a value the value must be part of the key. This is also +// exactly how the rv1 match writer emits properties (prop + "=" + value), so the +// encoding round-trips symmetrically between what we put in and what we read +// back from an allocation. A bare tag (empty value) composes to just "key". +func ComposeProperty(key, value string) string { + if value == "" { + return key + } + return key + "=" + value +} + +// virtualProperties builds the property SET (composed key=value strings, stored +// as JGF property keys with empty values) for a configured virtual resource: the +// virtual marker, the class of this resource AND of every descendant in its +// subtree, and each user attribute namespaced under the Fluxion resource prefix. +// Every entry is filterable by an identical composed string in a jobspec +// constraint. +// +// Descendant classes are included because a jobspec constraint is GLOBAL and is +// evaluated against every node vertex the matcher descends through (RFC 31 +// constraints prune a node, and its subtree, on mismatch). To select a nested +// node by class (e.g. class=qpu where qpu sits under a qdevice node), every +// ancestor node on the path must also satisfy class=qpu — so each node carries +// the classes of everything beneath it. The trees are shallow, so these sets +// stay small. A class= constraint then reaches any node of that type +// because every ancestor down to it advertises that class. +func virtualProperties(res *Resource) map[string]string { + props := map[string]string{ + ComposeProperty(VirtualProperty, "true"): "", + } + for t := range subtreeClasses(res) { + props[ComposeProperty(ClassProperty, t)] = "" + } + for k, v := range res.ResolvedAttributes { + props[ComposeProperty(placement.FluxionResourcePrefix+k, v)] = "" + } + return props +} + +// subtreeClasses returns the set of resource types in res's subtree, including +// res itself — the classes a node must advertise so a constraint selecting any +// type within the subtree is not pruned at this node on the way down. +func subtreeClasses(res *Resource) map[string]bool { + types := map[string]bool{} + var walk func(r *Resource) + walk = func(r *Resource) { + types[r.Type] = true + for i := range r.With { + walk(&r.With[i]) + } + } + walk(res) + return types +} + +// appendResources attaches each configured resource tree under its parent in +// the builder, continuing the shared node-rank counter. parents maps vertex +// names (physical nodes + the cluster root) to their builder handles. +func appendResources( + b *jgf.Builder, + parents map[string]*jgf.Vertex, + clusterName string, + resources []Resource, + rank *int64, +) error { + for i := range resources { + res := &resources[i] + parentName := res.Parent + if parentName == "" { + parentName = clusterName + } + parent, ok := parents[parentName] + if !ok { + return fmt.Errorf( + "resource %q references unknown parent vertex %q", res.Type, parentName) + } + if err := addResource(b, parent, res, rank); err != nil { + return err + } + } + return nil +} + +// addResource adds one configured resource, at ANY depth, as a node vertex +// carrying virtual=true, its configured type as the class property, and its +// attributes as RFC 31 properties — then recurses on its children the same way. +// +// Every level is a node so that every level is independently selectable by a +// class= constraint (RFC 31 property constraints match only node and +// storage_node vertices, so a non-node vertex could never be filtered). The +// vertex basename stays the configured type, so jgf auto-names it (qpu0, qubit0) +// and class= is meaningful; only the graph TYPE is "node". +// +// Every node consumes a real, unique rank from the shared counter. Ranks must be +// real (not -1) for any node vertex; the virtual sub-resources therefore appear +// in the rv1 nodelist, which is fine here because the allocation is a hold whose +// response we read directly — we never dispatch work to those ranks (no +// flux-core execution). +func addResource(b *jgf.Builder, parent *jgf.Vertex, res *Resource, rank *int64) error { + r := *rank + *rank++ + size := res.Count + if size == 0 { + size = 1 + } + v := b.AddChild(parent, "node", res.Type, jgf.Options{ + Name: res.Name, // empty -> jgf auto-names as + Size: size, + Rank: &r, + NodeProperties: virtualProperties(res), + }) + + for i := range res.With { + if err := addResource(b, v, &res.With[i], rank); err != nil { + return err + } + } + return nil +} + +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise for the configured resources: every counted resource +// TYPE under a top-level resource (the top-level virtual node is matched via the +// FluxionResourceNames returns the distinct extended-resource names a device +// plugin should advertise: every resource TYPE at every level of the configured +// trees. Every virtual resource is a node selectable by class=, so every +// type — top-level (qdevice) and nested (qpu, qubit) alike — is requestable. +// Prefixed so they match what the scheduler strips off a pod request. +func FluxionResourceNames(resources []Resource) []string { + types := map[string]bool{} + for i := range resources { + collectTypes(&resources[i], types) + } + names := make([]string, 0, len(types)) + for t := range types { + names = append(names, placement.FluxionResourcePrefix+t) + } + sort.Strings(names) + return names +} + +func collectTypes(res *Resource, types map[string]bool) { + types[res.Type] = true + for i := range res.With { + collectTypes(&res.With[i], types) + } +} + +// AttributeKeys returns the sorted union of every user attribute key across all +// configured resources (and their children). This is the set of attributes that +// may be injected into a workload's environment; computing it from the config +// (rather than hardcoding) means the env contract automatically tracks whatever +// attributes the backends declare. The backend identity itself is separate +// (FLUXION_BACKEND) and not included here. +func AttributeKeys(resources []Resource) []string { + keys := map[string]bool{} + var walk func(rs []Resource) + walk = func(rs []Resource) { + for i := range rs { + for k := range rs[i].ResolvedAttributes { + keys[k] = true + } + walk(rs[i].With) + } + } + walk(resources) + out := make([]string, 0, len(keys)) + for k := range keys { + out = append(out, k) + } + sort.Strings(out) + return out +} diff --git a/pkg/cluster/resources_graph_test.go b/pkg/cluster/resources_graph_test.go new file mode 100644 index 0000000..9c7339a --- /dev/null +++ b/pkg/cluster/resources_graph_test.go @@ -0,0 +1,221 @@ +package cluster + +import ( + "encoding/json" + "testing" + + "github.com/converged-computing/fluence/pkg/jgf" +) + +// buildResourceNodes appends the given config's resource trees to a fresh graph +// (simulating physical nodes already having consumed ranks 0..startRank-1) and +// returns the emitted vertex metadata for assertions. +func buildResourceNodes(t *testing.T, cfg string, startRank int64) []map[string]any { + t.Helper() + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + b := jgf.NewBuilder() + cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: "cluster"}) + parents := map[string]*jgf.Vertex{"cluster": cluster} + rank := startRank + if err := appendResources(b, parents, "cluster", c.Resources, &rank); err != nil { + t.Fatalf("append: %v", err) + } + raw, _ := b.JSON() + var doc struct { + Graph struct { + Nodes []struct { + Metadata map[string]any `json:"metadata"` + } `json:"nodes"` + } `json:"graph"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + t.Fatalf("unmarshal: %v\n%s", err, raw) + } + out := []map[string]any{} + for _, n := range doc.Graph.Nodes { + out = append(out, n.Metadata) + } + return out +} + +func metaByName(nodes []map[string]any, name string) map[string]any { + for _, m := range nodes { + if m["name"] == name { + return m + } + } + return nil +} + +// A configured virtual resource is modeled as a node vertex carrying +// virtual=true, the class of itself AND its descendants, its (inherited) +// attributes as namespaced RFC 31 properties, and a real rank. EVERY level +// (qdevice, qpu, qubit) is a node so each is independently selectable by a +// class= constraint. +func TestVirtualResourceModeledAsNode(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: rigetti_cepheus + count: 1 + attributes: + region: us-east-1 + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 80 +` + nodes := buildResourceNodes(t, cfg, 3) // ranks 0-2 taken by physical nodes + + dev := metaByName(nodes, "rigetti_cepheus") + if dev == nil { + t.Fatal("rigetti_cepheus vertex not found") + } + if dev["type"] != "node" { + t.Errorf("virtual device type = %v, want node", dev["type"]) + } + if dev["rank"] != float64(3) { + t.Errorf("virtual device rank = %v, want 3 (continues physical ranks)", dev["rank"]) + } + props, ok := dev["properties"].(map[string]any) + if !ok { + t.Fatalf("no properties object: %#v", dev["properties"]) + } + // Properties are composed key=value strings (RFC 31 match is key-presence + // only, so the value must be in the key), stored with empty values. + if _, ok := props["virtual=true"]; !ok { + t.Errorf("missing virtual=true property; got %#v", props) + } + // The qdevice node carries its own class AND its descendants' classes, so a + // class=qpu / class=qubit constraint is not pruned at this node on the way + // down to a nested target. + for _, want := range []string{"class=qdevice", "class=qpu", "class=qubit"} { + if _, ok := props[want]; !ok { + t.Errorf("missing %q property on qdevice node; got %#v", want, props) + } + } + if _, ok := props["fluxion.flux-framework.org/region=us-east-1"]; !ok { + t.Errorf("missing composed region property; got %#v", props) + } + + // The qpu is now ALSO a node, with class=qpu (and its descendant class=qubit), + // virtual=true, and the inherited region attribute, on its own real rank. + qpu := metaByName(nodes, "qpu0") + if qpu == nil { + t.Fatal("qpu child (qpu0) not found") + } + if qpu["type"] != "node" { + t.Errorf("qpu type = %v, want node (every level is a node)", qpu["type"]) + } + if qpu["rank"] == float64(-1) { + t.Errorf("qpu rank = -1, want a real rank (it is a node)") + } + qprops, _ := qpu["properties"].(map[string]any) + for _, want := range []string{"class=qpu", "class=qubit", "virtual=true", "fluxion.flux-framework.org/region=us-east-1"} { + if _, ok := qprops[want]; !ok { + t.Errorf("qpu node missing %q (class self+descendant, virtual, inherited attr); got %#v", want, qprops) + } + } + // The qpu must NOT advertise class=qdevice (that is an ancestor, not in its + // subtree) — otherwise class=qdevice would wrongly select the qpu too. + if _, ok := qprops["class=qdevice"]; ok { + t.Errorf("qpu node should not carry class=qdevice (ancestor class); got %#v", qprops) + } +} + +// FluxionResourceNames includes every type at every level (qdevice, qpu, qubit), +// because every virtual resource is a node selectable by class=. +func TestFluxionResourceNamesIncludesAllLevels(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: d + with: + - type: qpu + count: 1 + with: + - type: qubit + count: 4 +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + names := FluxionResourceNames(c.Resources) + want := map[string]bool{ + "fluxion.flux-framework.org/qdevice": true, + "fluxion.flux-framework.org/qpu": true, + "fluxion.flux-framework.org/qubit": true, + } + if len(names) != len(want) { + t.Fatalf("names = %v, want all of %v", names, want) + } + for _, n := range names { + if !want[n] { + t.Errorf("unexpected name %q", n) + } + } +} + +// An unknown parent reference is a hard error. +func TestUnknownParentErrors(t *testing.T) { + cfg := ` +resources: + - type: qpu + name: orphan + parent: nonexistent-node +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + b := jgf.NewBuilder() + cluster := b.AddRoot("cluster", "cluster", jgf.Options{Name: "cluster"}) + parents := map[string]*jgf.Vertex{"cluster": cluster} + var rank int64 + if err := appendResources(b, parents, "cluster", c.Resources, &rank); err == nil { + t.Fatal("expected an error for an unknown parent") + } +} + +// AttributeKeys returns the sorted union of attribute keys across all resources +// and children, deduplicated — this is the env-injection contract. +func TestAttributeKeysUnion(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: a + attributes: + region: us-east-1 + connectivity: all-to-all + with: + - type: qpu + attributes: + qubits: "80" + - type: qdevice + name: b + attributes: + region: us-west-2 + vendor: ibm +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + keys := AttributeKeys(c.Resources) + // union: connectivity, qubits, region, vendor (region deduped) + want := []string{"connectivity", "qubits", "region", "vendor"} + if len(keys) != len(want) { + t.Fatalf("keys = %v, want %v", keys, want) + } + for i := range want { + if keys[i] != want[i] { + t.Fatalf("keys = %v, want %v", keys, want) + } + } +} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 0000000..b90370f --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,214 @@ +package cluster + +import ( + "testing" +) + +// A realistic config: a virtual qdevice (under the default parent) holding a qpu +// child, with attributes given by reference, plus a second device with inline +// attributes. Exercises nesting, count, the parent default, and both attribute +// forms. +const sampleConfig = ` +resources: + - type: qdevice + name: rigetti_cepheus + attributes: aws-east + with: + - type: qpu + count: 1 + attributes: aws-east + with: + - type: qubit + count: 80 + - type: qdevice + name: ibm_marrakesh + parent: cluster + attributes: + region: us-west-2 + vendor: ibm + +attributes: + aws-east: + region: us-east-1 + connectivity: all-to-all +` + +func TestLoadResourcesConfig(t *testing.T) { + c, err := LoadResourcesConfig([]byte(sampleConfig)) + if err != nil { + t.Fatalf("load: %v", err) + } + if len(c.Resources) != 2 { + t.Fatalf("got %d top-level resources, want 2", len(c.Resources)) + } + + rigetti := c.Resources[0] + if rigetti.Type != "qdevice" || rigetti.Name != "rigetti_cepheus" { + t.Errorf("first resource = %+v", rigetti) + } + // Parent defaults to cluster when unset. + if rigetti.Parent != DefaultParent { + t.Errorf("rigetti parent = %q, want %q", rigetti.Parent, DefaultParent) + } + // Referenced attributes resolved. + if rigetti.ResolvedAttributes["region"] != "us-east-1" || + rigetti.ResolvedAttributes["connectivity"] != "all-to-all" { + t.Errorf("rigetti attributes = %v", rigetti.ResolvedAttributes) + } + + // Nested child + count. + if len(rigetti.With) != 1 || rigetti.With[0].Type != "qpu" { + t.Fatalf("rigetti child = %+v", rigetti.With) + } + qpu := rigetti.With[0] + if qpu.Count != 1 { + t.Errorf("qpu count = %d, want 1", qpu.Count) + } + if qpu.ResolvedAttributes["region"] != "us-east-1" { + t.Errorf("qpu resolved attributes = %v", qpu.ResolvedAttributes) + } + if len(qpu.With) != 1 || qpu.With[0].Type != "qubit" || qpu.With[0].Count != 80 { + t.Errorf("qubit child = %+v", qpu.With) + } + + // Inline attributes on the second device. + ibm := c.Resources[1] + if ibm.ResolvedAttributes["region"] != "us-west-2" || ibm.ResolvedAttributes["vendor"] != "ibm" { + t.Errorf("ibm attributes = %v", ibm.ResolvedAttributes) + } +} + +// A referenced attribute set that doesn't exist is a hard error, not a silent +// empty map. +func TestUnknownAttributeReferenceErrors(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: x + attributes: does-not-exist +` + _, err := LoadResourcesConfig([]byte(cfg)) + if err == nil { + t.Fatal("expected an error for an unknown attribute reference") + } +} + +// A resource missing 'type' is a hard error. +func TestMissingTypeErrors(t *testing.T) { + cfg := ` +resources: + - name: x +` + _, err := LoadResourcesConfig([]byte(cfg)) + if err == nil { + t.Fatal("expected an error for a resource with no type") + } +} + +// Resolved attribute maps are copies, so mutating one resource's map does not +// bleed into the shared registry or another resource referencing the same set. +func TestReferencedAttributesAreCopied(t *testing.T) { + cfg := ` +resources: + - type: a + attributes: shared + - type: b + attributes: shared +attributes: + shared: + k: v +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + c.Resources[0].ResolvedAttributes["k"] = "mutated" + if c.Resources[1].ResolvedAttributes["k"] != "v" { + t.Errorf("mutation bled across resources: %v", c.Resources[1].ResolvedAttributes) + } +} + +// A resource with no attributes leaves ResolvedAttributes nil (not an empty map +// we'd then emit as an empty properties object). +func TestNoAttributesIsNil(t *testing.T) { + cfg := ` +resources: + - type: node + name: plain +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatalf("load: %v", err) + } + if c.Resources[0].ResolvedAttributes != nil { + t.Errorf("expected nil attributes, got %v", c.Resources[0].ResolvedAttributes) + } +} + +// A config with content but no resources: key (e.g. an older "backends:" layout) +// must error loudly rather than silently yielding an empty graph. +func TestLoadResourcesConfigRejectsSchemaMismatch(t *testing.T) { + _, err := LoadResourcesConfig([]byte("backends:\n - name: ibm_fez\n num_qubits: 156\n")) + if err == nil { + t.Fatal("a non-empty config defining no resources should error (schema mismatch)") + } +} + +// A genuinely empty or comment-only config stays classical-only (no error). +func TestLoadResourcesConfigEmptyIsClassical(t *testing.T) { + for _, in := range []string{"", " \n\n", "# only a comment\n"} { + c, err := LoadResourcesConfig([]byte(in)) + if err != nil { + t.Fatalf("empty/comment config %q should be OK, got: %v", in, err) + } + if len(c.Resources) != 0 { + t.Fatalf("empty config %q should yield 0 resources", in) + } + } +} + +// Attributes inherit downward: a child gets the parent's attributes, can override +// a key, and can clear one by setting it to empty. This makes a nested class + +// attribute constraint reachable (every node on the path carries the attribute). +func TestAttributeInheritance(t *testing.T) { + cfg := ` +resources: + - type: qdevice + name: d + attributes: + region: us-east-1 + vendor: ibm + with: + - type: qpu + attributes: + vendor: rigetti + region: "" + with: + - type: qubit + count: 4 +` + c, err := LoadResourcesConfig([]byte(cfg)) + if err != nil { + t.Fatal(err) + } + dev := c.Resources[0] + if dev.ResolvedAttributes["region"] != "us-east-1" || dev.ResolvedAttributes["vendor"] != "ibm" { + t.Fatalf("qdevice attrs = %v", dev.ResolvedAttributes) + } + qpu := dev.With[0] + // vendor overridden, region cleared (explicit ""), nothing else + if qpu.ResolvedAttributes["vendor"] != "rigetti" { + t.Errorf("qpu vendor = %q, want rigetti (override)", qpu.ResolvedAttributes["vendor"]) + } + if _, ok := qpu.ResolvedAttributes["region"]; ok { + t.Errorf("qpu region should be cleared (explicit empty); got %v", qpu.ResolvedAttributes) + } + // qubit inherits qpu's resolved attrs (vendor=rigetti, no region) + qubit := qpu.With[0] + if qubit.ResolvedAttributes["vendor"] != "rigetti" { + t.Errorf("qubit should inherit vendor=rigetti; got %v", qubit.ResolvedAttributes) + } + if _, ok := qubit.ResolvedAttributes["region"]; ok { + t.Errorf("qubit should not have region (cleared at qpu); got %v", qubit.ResolvedAttributes) + } +} diff --git a/pkg/fluence/fluence.go b/pkg/fluence/fluence.go index 5edbbf4..439e0d1 100644 --- a/pkg/fluence/fluence.go +++ b/pkg/fluence/fluence.go @@ -5,11 +5,14 @@ import ( "fmt" "log" "os" + "sort" "strconv" + "strings" "sync" "github.com/converged-computing/fluence/pkg/cluster" "github.com/converged-computing/fluence/pkg/graph" + "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" @@ -46,24 +49,32 @@ type matcher interface { // the graph itself is rebuilt fresh on restart. type groupAlloc struct { place placement.Placement - jobid uint64 + // jobids are the Fluxion allocations backing this group — one per match + // (compute, plus one per requested virtual device). All are held (duration 0) + // and cancelled together; the group is all-or-nothing across them. + jobids []uint64 } // Fluence is a scheduler-framework plugin that places whole pod groups by // matching them against a flux-sched resource graph built from the live cluster -// (plus any configured quantum resources). Gang/all-or-nothing semantics are +// (plus any configured virtual resources). Gang/all-or-nothing semantics are // delegated to the native PodGroup API; Fluence only decides placement. type Fluence struct { handle fwk.Handle matcher matcher + // knownDevices is the set of virtual resource types the graph models (suffix + // only, e.g. "qpu"), used to reject a pod requesting a device that does not + // exist before issuing a match. Empty when no resources are configured. + knownDevices map[string]bool + // matcherMu serializes all access to the cgo Fluxion client, which is not // thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in // informer handler goroutines, so the two can race without this. matcherMu sync.Mutex mu sync.Mutex - // placement maps a group key to its allocation (nodes, backend, jobid). + // placement maps a group key to its allocation (nodes, backend, jobids). placement map[string]groupAlloc } @@ -94,15 +105,22 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error // the resources add-on is applied, so a missing file is classical-only, not // fatal. opts := cluster.Options{} + knownDevices := map[string]bool{} if path := os.Getenv("FLUENCE_RESOURCES"); path != "" { raw, err := os.ReadFile(path) switch { case err == nil: - qc, err := cluster.LoadQuantumConfig(raw) + rc, err := cluster.LoadResourcesConfig(raw) if err != nil { return nil, err } - opts.Quantum = qc.Backends + opts.Resources = rc.Resources + // The requestable device types are the FluxionResourceNames, minus + // the prefix — the suffixes a pod requests as + // fluxion.flux-framework.org/. + for _, name := range cluster.FluxionResourceNames(rc.Resources) { + knownDevices[strings.TrimPrefix(name, placement.FluxionResourcePrefix)] = true + } case os.IsNotExist(err): // No resources config mounted -> classical-only graph. default: @@ -114,7 +132,9 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error if err != nil { return nil, fmt.Errorf("build resource graph: %w", err) } - fmt.Printf("Fluence resource graph:\n%s", jgfBytes) + fmt.Println("[fluence] === RESOURCE GRAPH (knownDevices=" + + fmt.Sprintf("%v", keysOf(knownDevices)) + ") ===") + fmt.Println(string(jgfBytes)) // FluxionGraph.Init reads from a file path, so stage the generated graph. tmp, err := os.CreateTemp("", "fluence-graph-*.json") @@ -130,13 +150,20 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error // scheduling key is the same JGF vertex subgraph we parse for placement, and // it carries the execution view flux uses to replay an allocation on restart. // This is the format we persist and feed back to UpdateAllocate for recovery. - fluxion := &graph.FluxionGraph{MatchFormat: "rv1"} + // jgf match format: emits every allocated vertex (with properties) as a graph, + // regardless of type. rv1 cannot represent our allocations — its R_lite is + // built from core/gpu "reducer" children under a node, so a virtual allocation + // that bottoms out in nodes (no cores) serializes to an empty R. jgf has no + // such assumption and is exactly what PlacementFromAllocation parses (node + // vertices + their composed marker/attribute properties). + fluxion := &graph.FluxionGraph{MatchFormat: "jgf"} fluxion.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "") f := &Fluence{ - handle: h, - matcher: fluxion, - placement: map[string]groupAlloc{}, + handle: h, + matcher: fluxion, + knownDevices: knownDevices, + placement: map[string]groupAlloc{}, } f.registerCancelHandlers() return f, nil @@ -168,38 +195,22 @@ func (f *Fluence) PreFilter( return nil, fwk.AsStatus(err) } - js, err := placement.JobspecForGroup(group, pods) - if err != nil { - return nil, fwk.AsStatus(err) - } - specYAML, err := js.YAML() + specs, err := placement.JobspecsForGroup(group, pods, f.knownDevices) if err != nil { return nil, fwk.AsStatus(err) } - fmt.Printf("Attempting to match:\n%s\n", specYAML) - f.matcherMu.Lock() - req, err := f.matcher.MatchAllocateSpec(specYAML) - f.matcherMu.Unlock() - if err != nil { - fmt.Printf("FAIL Match failed: %s\n", err) - return nil, fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err)) + // Run every jobspec as an independent held allocation (duration 0). The group + // is all-or-nothing: if any match fails, cancel the ones that already + // succeeded so we never hold a partial allocation (e.g. compute without its + // device, or vice versa). + place, jobids, status := f.matchGroup(specs) + if !status.IsSuccess() { + return nil, status } - place, err := placement.PlacementFromAllocation(req.Allocation) - if err != nil { - fmt.Printf("FAIL Placement failed: %s\n", err) - return nil, fwk.AsStatus(err) - } - if len(place.Nodes) == 0 && place.Backend == "" { - fmt.Println("FAIL No nodes") - return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation") - } - // A quantum-only allocation has a Backend but no Nodes (a qpu vertex lives - // under the qgateway, not under a compute node). That is valid — the backend - // is reachable from any node — so Filter imposes no node constraint then. f.mu.Lock() - f.placement[group] = groupAlloc{place: place, jobid: req.Number} + f.placement[group] = groupAlloc{place: place, jobids: jobids} f.mu.Unlock() // The jobid (for cancel) and any backend (for the webhook env) are written @@ -207,6 +218,91 @@ func (f *Fluence) PreFilter( return nil, fwk.NewStatus(fwk.Success) } +// matchGroup runs each jobspec as an independent held Fluxion allocation and +// combines them into one placement. It is all-or-nothing: on the first failure +// it cancels every allocation already made and returns an Unschedulable status, +// so the group never holds a partial set (compute without its device, etc.). +// +// The combined placement unions the per-match results: the compute match +// supplies the bind nodes, a device match supplies the backend identity. (The +// per-match split of nodes vs backend is PlacementFromAllocation's job; here we +// merge.) +func (f *Fluence) matchGroup(specs []*jobspec.Jobspec) (placement.Placement, []uint64, *fwk.Status) { + var combined placement.Placement + var jobids []uint64 + + for i, js := range specs { + // Render the jobspec as JSON, not YAML. flux-sched's RFC 31 constraint + // parser requires each property to be a QUOTED scalar (it checks the YAML + // tag == "!"); sigs.k8s.io/yaml emits property strings unquoted + // (e.g. "- virtual=false"), which the parser rejects with "non-string + // property specified" -> the whole match fails with -1. JSON always quotes + // strings, and JSON is valid YAML input to the matcher, so this is the + // reliable encoding for jobspecs that carry constraints. + spec, err := js.JSON() + if err != nil { + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.AsStatus(err) + } + + fmt.Println(fmt.Sprintf("[fluence] === MATCH %d/%d: submitting jobspec to fluxion ===", i+1, len(specs))) + fmt.Println(spec) + + f.matcherMu.Lock() + req, err := f.matcher.MatchAllocateSpec(spec) + f.matcherMu.Unlock() + if err != nil { + log.Printf("[fluence] MATCH %d/%d FAILED: %v — rolling back jobids %v", + i+1, len(specs), err, jobids) + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.NewStatus( + fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err)) + } + + fmt.Println(fmt.Sprintf("[fluence] MATCH %d/%d allocated jobid %d; fluxion R:", i+1, len(specs), req.Number)) + fmt.Println(req.Allocation) + + place, err := placement.PlacementFromAllocation(req.Allocation) + if err != nil { + log.Printf("[fluence] MATCH %d/%d placement-parse FAILED: %v", i+1, len(specs), err) + f.cancelJobids(append(jobids, req.Number)) + return placement.Placement{}, nil, fwk.AsStatus(err) + } + fmt.Println(fmt.Sprintf("[fluence] MATCH %d/%d parsed: nodes=%v backend=%q attrs=%v", + i+1, len(specs), place.Nodes, place.Backend, place.BackendAttributes)) + + jobids = append(jobids, req.Number) + combined.Nodes = append(combined.Nodes, place.Nodes...) + if place.Backend != "" { + combined.Backend = place.Backend + combined.BackendAttributes = place.BackendAttributes + } + } + + if len(combined.Nodes) == 0 && combined.Backend == "" { + log.Printf("[fluence] match produced no nodes and no backend — unschedulable") + f.cancelJobids(jobids) + return placement.Placement{}, nil, fwk.NewStatus( + fwk.Unschedulable, "fluxion returned no allocation") + } + fmt.Println(fmt.Sprintf("[fluence] GROUP MATCHED: nodes=%v backend=%q attrs=%v jobids=%v", + combined.Nodes, combined.Backend, combined.BackendAttributes, jobids)) + return combined, jobids, fwk.NewStatus(fwk.Success) +} + +// cancelJobids frees a set of held allocations, used to unwind a partial group +// match. Cancel is idempotent and best-effort; errors are logged, not returned. +func (f *Fluence) cancelJobids(jobids []uint64) { + for _, id := range jobids { + f.matcherMu.Lock() + err := f.matcher.Cancel(id) + f.matcherMu.Unlock() + if err != nil { + log.Printf("fluence: rollback cancel of jobid %d failed: %v", id, err) + } + } +} + // PreFilterExtensions: no add/remove pod handling for now. func (f *Fluence) PreFilterExtensions() fwk.PreFilterExtensions { return nil } @@ -275,21 +371,32 @@ func (f *Fluence) PreBind( return fwk.NewStatus(fwk.Success) // not ours; nothing to record } - if err := f.recordJobID(ctx, pod, alloc.jobid); err != nil { - return fwk.AsStatus(fmt.Errorf("record jobid: %w", err)) + if err := f.recordJobIDs(ctx, pod, alloc.jobids); err != nil { + return fwk.AsStatus(fmt.Errorf("record jobids: %w", err)) } if alloc.place.Backend != "" { - if err := f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.BackendAnnotation, alloc.place.Backend); err != nil { - return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err)) + // Stamp the backend name and all matched attributes in one patch. The + // webhook injects a normalized env per annotation so the workload reads + // exactly what it matched (backend + region/qubits/...). + ann := map[string]string{placement.BackendAnnotation: alloc.place.Backend} + for k, v := range alloc.place.BackendAttributes { + ann[placement.AttributeAnnotationPrefix+k] = v + } + log.Printf("[fluence] group %s -> backend %q attrs %v (nodes %v, jobids %v)", + groupKey(pod), alloc.place.Backend, alloc.place.BackendAttributes, + alloc.place.Nodes, alloc.jobids) + if err := f.patchPodAnnotations(ctx, pod.Namespace, pod.Name, ann); err != nil { + return fwk.AsStatus(fmt.Errorf("stamp backend annotations: %w", err)) } } return fwk.NewStatus(fwk.Success) } -// recordJobID writes the jobid annotation onto the allocation's owning object: a -// grouped pod's allocation belongs to the PodGroup; an ungrouped pod owns its own. -func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64) error { - val := strconv.FormatUint(jobid, 10) +// recordJobIDs writes the jobid annotation (a comma-separated list of all the +// group's held allocations) onto the allocation's owning object: a grouped pod's +// allocation belongs to the PodGroup; an ungrouped pod owns its own. +func (f *Fluence) recordJobIDs(ctx context.Context, pod *corev1.Pod, jobids []uint64) error { + val := formatJobIDs(jobids) if group := placement.PodGroupName(pod); group != "" { patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.JobIDAnnotation, val) _, err := f.handle.ClientSet().SchedulingV1alpha2().PodGroups(pod.Namespace).Patch( @@ -299,8 +406,36 @@ func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64 return f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.JobIDAnnotation, val) } +// keysOf returns the keys of a set, for logging. +func keysOf(m map[string]bool) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + sort.Strings(out) + return out +} + +// formatJobIDs renders jobids as a comma-separated string for the annotation. +func formatJobIDs(jobids []uint64) string { + parts := make([]string, len(jobids)) + for i, id := range jobids { + parts[i] = strconv.FormatUint(id, 10) + } + return strings.Join(parts, ",") +} + func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val string) error { - patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, key, val) + return f.patchPodAnnotations(ctx, ns, name, map[string]string{key: val}) +} + +// patchPodAnnotations merges a set of annotations onto a pod in one patch. +func (f *Fluence) patchPodAnnotations(ctx context.Context, ns, name string, ann map[string]string) error { + parts := make([]string, 0, len(ann)) + for k, v := range ann { + parts = append(parts, fmt.Sprintf("%q:%q", k, v)) + } + patch := fmt.Sprintf(`{"metadata":{"annotations":{%s}}}`, strings.Join(parts, ",")) _, err := f.handle.ClientSet().CoreV1().Pods(ns).Patch( ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) return err @@ -355,12 +490,12 @@ func (f *Fluence) onPodDeleted(obj interface{}) { f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations) } -// cancelGroup frees the allocation for a deleted owning object. The jobid comes +// cancelGroup frees all allocations for a deleted owning object. The jobids come // from the object's annotation (the durable source of truth); if it is missing // (e.g. deleted between PreFilter and PreBind, before the annotation was // written) it falls back to the in-memory memo by key. Cancel is idempotent. func (f *Fluence) cancelGroup(key string, ann map[string]string) { - jobid, ok := parseJobID(ann) + jobids, ok := parseJobIDs(ann) if !ok { f.mu.Lock() alloc, found := f.placement[key] @@ -368,14 +503,16 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { if !found { return // never scheduled by us, or already cancelled } - jobid = alloc.jobid + jobids = alloc.jobids } - f.matcherMu.Lock() - err := f.matcher.Cancel(jobid) - f.matcherMu.Unlock() - if err != nil { - log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err) + for _, jobid := range jobids { + f.matcherMu.Lock() + err := f.matcher.Cancel(jobid) + f.matcherMu.Unlock() + if err != nil { + log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err) + } } f.mu.Lock() @@ -383,16 +520,29 @@ func (f *Fluence) cancelGroup(key string, ann map[string]string) { f.mu.Unlock() } -func parseJobID(ann map[string]string) (uint64, bool) { +// parseJobIDs reads the comma-separated jobid annotation into a slice. Returns +// false when the annotation is absent or empty. +func parseJobIDs(ann map[string]string) ([]uint64, bool) { raw := ann[placement.JobIDAnnotation] if raw == "" { - return 0, false + return nil, false } - jobid, err := strconv.ParseUint(raw, 10, 64) - if err != nil { - return 0, false + var jobids []uint64 + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + id, err := strconv.ParseUint(part, 10, 64) + if err != nil { + return nil, false + } + jobids = append(jobids, id) + } + if len(jobids) == 0 { + return nil, false } - return jobid, true + return jobids, true } // groupPods returns the pods belonging to the same native PodGroup as pod diff --git a/pkg/fluence/fluence_test.go b/pkg/fluence/fluence_test.go index 87e5a3e..136bf1b 100644 --- a/pkg/fluence/fluence_test.go +++ b/pkg/fluence/fluence_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/converged-computing/fluence/pkg/graph" + "github.com/converged-computing/fluence/pkg/jobspec" "github.com/converged-computing/fluence/pkg/placement" corev1 "k8s.io/api/core/v1" @@ -15,13 +16,28 @@ import ( // fakeMatcher records Cancel calls so cancel behavior can be asserted without // the real cgo/flux matcher. It satisfies the package-internal matcher interface. +// matchResults/matchErrs are consumed in order, one per MatchAllocateSpec call, +// to script multi-match (compute + device) scenarios. type fakeMatcher struct { - cancelled []uint64 - cancelErr error + cancelled []uint64 + cancelErr error + matchN int + matchResults []graph.MatchAllocateRequest + matchErrs []error } func (m *fakeMatcher) MatchAllocateSpec(string) (graph.MatchAllocateRequest, error) { - return graph.MatchAllocateRequest{}, nil + i := m.matchN + m.matchN++ + var res graph.MatchAllocateRequest + if i < len(m.matchResults) { + res = m.matchResults[i] + } + var err error + if i < len(m.matchErrs) { + err = m.matchErrs[i] + } + return res, err } func (m *fakeMatcher) Cancel(jobid uint64) error { @@ -56,25 +72,37 @@ func ungroupedPod(ns, name string, annotations map[string]string) *corev1.Pod { } } -func TestParseJobID(t *testing.T) { +func TestParseJobIDs(t *testing.T) { cases := []struct { name string ann map[string]string - want uint64 + want []uint64 wantOK bool }{ - {"present", ann("42"), 42, true}, - {"absent", map[string]string{}, 0, false}, - {"nil map", nil, 0, false}, - {"empty value", ann(""), 0, false}, - {"garbage", ann("not-a-number"), 0, false}, - {"zero", ann("0"), 0, true}, + {"single", ann("42"), []uint64{42}, true}, + {"multiple", ann("42,7"), []uint64{42, 7}, true}, + {"spaces", ann("42, 7 ,9"), []uint64{42, 7, 9}, true}, + {"absent", map[string]string{}, nil, false}, + {"nil map", nil, nil, false}, + {"empty value", ann(""), nil, false}, + {"garbage", ann("not-a-number"), nil, false}, + {"zero", ann("0"), []uint64{0}, true}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - got, ok := parseJobID(c.ann) - if got != c.want || ok != c.wantOK { - t.Fatalf("parseJobID(%v) = (%d,%t), want (%d,%t)", c.ann, got, ok, c.want, c.wantOK) + got, ok := parseJobIDs(c.ann) + if ok != c.wantOK { + t.Fatalf("parseJobIDs(%v) ok = %t, want %t", c.ann, ok, c.wantOK) + } + if ok { + if len(got) != len(c.want) { + t.Fatalf("parseJobIDs(%v) = %v, want %v", c.ann, got, c.want) + } + for i := range got { + if got[i] != c.want[i] { + t.Fatalf("parseJobIDs(%v) = %v, want %v", c.ann, got, c.want) + } + } } }) } @@ -94,7 +122,7 @@ func TestGroupKey(t *testing.T) { func TestCancelGroupPrefersAnnotation(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/training"] = groupAlloc{jobid: 42} + f.placement["default/training"] = groupAlloc{jobids: []uint64{42}} f.cancelGroup("default/training", ann("99")) @@ -111,7 +139,7 @@ func TestCancelGroupPrefersAnnotation(t *testing.T) { func TestCancelGroupMemoFallback(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", nil) @@ -137,7 +165,7 @@ func TestCancelGroupUnknownNoop(t *testing.T) { func TestCancelGroupIdempotent(t *testing.T) { m := &fakeMatcher{} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", nil) // frees, deletes memo f.cancelGroup("default/solo", nil) // memo gone, no annotation -> no-op @@ -151,7 +179,7 @@ func TestCancelGroupIdempotent(t *testing.T) { func TestCancelGroupMatcherErrorStillDeletes(t *testing.T) { m := &fakeMatcher{cancelErr: errors.New("flux boom")} f := newTestFluence(m) - f.placement["default/solo"] = groupAlloc{jobid: 7} + f.placement["default/solo"] = groupAlloc{jobids: []uint64{7}} f.cancelGroup("default/solo", ann("7")) @@ -244,3 +272,70 @@ func TestOnPodGroupDeletedTombstone(t *testing.T) { t.Fatalf("cancelled = %v, want [4] from PodGroup tombstone", m.cancelled) } } + +// matchGroup combines a compute allocation (nodes) and a device allocation +// (backend) into one placement and records both jobids. +func TestMatchGroupCombinesAllocations(t *testing.T) { + m := &fakeMatcher{ + matchResults: []graph.MatchAllocateRequest{ + {Number: 10, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}]}}`}, + {Number: 11, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"rigetti","properties":{"virtual=true":""}}}]}}`}, + }, + } + f := newTestFluence(m) + + place, jobids, status := f.matchGroup(twoSpecs()) + if !status.IsSuccess() { + t.Fatalf("status = %v, want success", status) + } + if len(jobids) != 2 || jobids[0] != 10 || jobids[1] != 11 { + t.Fatalf("jobids = %v, want [10 11]", jobids) + } + // Both allocations contributed (exact node/backend split is + // PlacementFromAllocation's job, refined in the placement rewrite). + if len(place.Nodes) == 0 { + t.Errorf("expected at least one bind node, got %v", place.Nodes) + } +} + +// If a later match fails, every already-successful allocation in the group is +// cancelled (all-or-nothing) and the group is Unschedulable. +func TestMatchGroupAllOrNothing(t *testing.T) { + m := &fakeMatcher{ + matchResults: []graph.MatchAllocateRequest{ + {Number: 20, Allocation: `{"graph":{"nodes":[{"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}]}}`}, + {}, + }, + matchErrs: []error{nil, errors.New("no qpu available")}, + } + f := newTestFluence(m) + + _, _, status := f.matchGroup(twoSpecs()) + if status.IsSuccess() { + t.Fatal("expected Unschedulable when the device match fails") + } + if len(m.cancelled) != 1 || m.cancelled[0] != 20 { + t.Fatalf("cancelled = %v, want [20] (roll back the successful compute match)", m.cancelled) + } +} + +// A multi-jobid group records a comma-separated annotation and cancels every id. +func TestCancelGroupMultipleJobids(t *testing.T) { + m := &fakeMatcher{} + f := newTestFluence(m) + + f.cancelGroup("default/training", ann("10,11")) + + if len(m.cancelled) != 2 || m.cancelled[0] != 10 || m.cancelled[1] != 11 { + t.Fatalf("cancelled = %v, want [10 11]", m.cancelled) + } +} + +// twoSpecs returns two minimal jobspecs (compute + device) to drive matchGroup +// tests; their content doesn't matter since the fake matcher scripts results. +func twoSpecs() []*jobspec.Jobspec { + return []*jobspec.Jobspec{ + {Version: 9999}, + {Version: 9999}, + } +} diff --git a/pkg/graph/allocation.go b/pkg/graph/allocation.go index b3ff161..2d77784 100644 --- a/pkg/graph/allocation.go +++ b/pkg/graph/allocation.go @@ -5,15 +5,22 @@ import ( "fmt" ) -// graphVertices is the subset of a JGF graph we need: the type and name of each +// vertexMeta is the subset of a JGF vertex metadata fluence needs from an +// allocation: type, name, and the RFC 31 properties object. Properties are +// stored as composed "key=value" keys (matching how the graph is built and how +// the rv1 writer emits them); the value half of the JSON object is unused. +type vertexMeta struct { + Type string `json:"type"` + Name string `json:"name"` + Properties map[string]string `json:"properties"` +} + +// graphVertices is the subset of a JGF graph we need: the metadata of each // vertex. It appears at the top level of a "jgf" allocation, and under the // "scheduling" key of an "rv1" allocation. type graphVertices struct { Nodes []struct { - Metadata struct { - Type string `json:"type"` - Name string `json:"name"` - } `json:"metadata"` + Metadata vertexMeta `json:"metadata"` } `json:"nodes"` } @@ -35,44 +42,78 @@ type allocation struct { // vertices returns the allocated vertices regardless of match format, preferring // the rv1 scheduling key and falling back to a top-level jgf graph. -func (a allocation) vertices() []struct { - Metadata struct { - Type string `json:"type"` - Name string `json:"name"` - } `json:"metadata"` -} { +func (a allocation) vertices() []vertexMeta { + src := a.Graph.Nodes if len(a.Scheduling.Graph.Nodes) > 0 { - return a.Scheduling.Graph.Nodes + src = a.Scheduling.Graph.Nodes + } + out := make([]vertexMeta, 0, len(src)) + for _, n := range src { + out = append(out, n.Metadata) } - return a.Graph.Nodes + return out } -// BackendFromAllocation returns the name of the first vertex of vertexType -// (e.g. "qpu" or "node") in a Fluxion allocation. -func BackendFromAllocation(alloc string, vertexType string) (string, error) { - names, err := NamesFromAllocation(alloc, vertexType) - if err != nil { - return "", err +// AllocatedNode is a node-typed vertex from an allocation, with the properties +// fluence classifies it by (virtual=true/false, class=..., user attributes). +type AllocatedNode struct { + Name string + Properties map[string]string +} + +// HasProperty reports whether the node carries the given composed property key +// (e.g. "virtual=true"). +func (n AllocatedNode) HasProperty(key string) bool { + _, ok := n.Properties[key] + return ok +} + +// NodesFromAllocation returns every node-typed vertex in an allocation together +// with its properties, so callers can classify them (physical vs virtual) by the +// composed marker keys rather than by vertex type. Under the virtual-resource +// model every allocatable thing is type "node"; the virtual marker, not the +// type, distinguishes a compute node from a virtual backend. +func NodesFromAllocation(alloc string) ([]AllocatedNode, error) { + var a allocation + if err := json.Unmarshal([]byte(alloc), &a); err != nil { + return nil, fmt.Errorf("parse allocation: %w", err) } - if len(names) == 0 { - return "", fmt.Errorf("no %q vertex found in allocation", vertexType) + var nodes []AllocatedNode + for _, v := range a.vertices() { + if v.Type == "node" { + nodes = append(nodes, AllocatedNode{Name: v.Name, Properties: v.Properties}) + } } - return names[0], nil + return nodes, nil } // NamesFromAllocation returns the names of every vertex of vertexType in a -// Fluxion allocation (jgf or rv1). Used to map an allocation onto cluster node -// names (vertexType "node") for pod placement, or onto quantum backends ("qpu"). +// Fluxion allocation (jgf or rv1). Retained for callers that key on a concrete +// vertex type (e.g. counting "qpu" children); placement now classifies node +// vertices via NodesFromAllocation and the virtual marker instead. func NamesFromAllocation(alloc string, vertexType string) ([]string, error) { var a allocation if err := json.Unmarshal([]byte(alloc), &a); err != nil { return nil, fmt.Errorf("parse allocation: %w", err) } var names []string - for _, n := range a.vertices() { - if n.Metadata.Type == vertexType { - names = append(names, n.Metadata.Name) + for _, v := range a.vertices() { + if v.Type == vertexType { + names = append(names, v.Name) } } return names, nil } + +// BackendFromAllocation returns the name of the first vertex of vertexType +// (e.g. "qpu" or "node") in a Fluxion allocation. +func BackendFromAllocation(alloc string, vertexType string) (string, error) { + names, err := NamesFromAllocation(alloc, vertexType) + if err != nil { + return "", err + } + if len(names) == 0 { + return "", fmt.Errorf("no %q vertex found in allocation", vertexType) + } + return names[0], nil +} diff --git a/pkg/graph/allocation_test.go b/pkg/graph/allocation_test.go index dc67118..1ddbfca 100644 --- a/pkg/graph/allocation_test.go +++ b/pkg/graph/allocation_test.go @@ -37,3 +37,39 @@ func TestNamesFromAllocationRV1(t *testing.T) { t.Fatalf("rv1 qpu parse: %q %v", be, err) } } + +// markedAlloc has node vertices carrying composed virtual markers plus a qpu +// child — the shape PlacementFromAllocation classifies. +const markedAlloc = `{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"kind-worker","properties":{"virtual=false":""}}}, + {"metadata":{"type":"node","name":"rigetti","properties":{"virtual=true":"","class=qdevice":""}}}, + {"metadata":{"type":"qpu","name":"qpu0"}}]}}` + +// NodesFromAllocation returns only node-typed vertices, each with its composed +// property keys, so callers classify by the virtual marker rather than the type. +func TestNodesFromAllocation(t *testing.T) { + nodes, err := NodesFromAllocation(markedAlloc) + if err != nil { + t.Fatal(err) + } + if len(nodes) != 2 { + t.Fatalf("got %d node vertices, want 2 (qpu is not a node)", len(nodes)) + } + + byName := map[string]AllocatedNode{} + for _, n := range nodes { + byName[n.Name] = n + } + if !byName["kind-worker"].HasProperty("virtual=false") { + t.Errorf("kind-worker missing virtual=false: %v", byName["kind-worker"].Properties) + } + if !byName["rigetti"].HasProperty("virtual=true") { + t.Errorf("rigetti missing virtual=true: %v", byName["rigetti"].Properties) + } + if !byName["rigetti"].HasProperty("class=qdevice") { + t.Errorf("rigetti missing class=qdevice: %v", byName["rigetti"].Properties) + } + if byName["kind-worker"].HasProperty("virtual=true") { + t.Error("kind-worker should not carry virtual=true") + } +} diff --git a/pkg/jgf/jgf.go b/pkg/jgf/jgf.go index ca9bd98..05b0507 100644 --- a/pkg/jgf/jgf.go +++ b/pkg/jgf/jgf.go @@ -10,7 +10,6 @@ import ( "encoding/json" "fmt" "sort" - "strings" ) const containment = "containment" @@ -29,6 +28,8 @@ type Vertex struct { unit string exclusive bool props map[string]any + nodeProps map[string]string + rank int64 } // Name is the vertex name (basename + per-type index, or an explicit name). @@ -61,6 +62,20 @@ type Options struct { // Properties are extra metadata fields merged into the vertex (e.g. // num_qubits, vendor). They are emitted alongside the standard fields. Properties map[string]any + // NodeProperties are RFC 31 resource properties, emitted as a nested + // "properties" object in the vertex metadata. Unlike Properties (which + // flattens descriptive fields into the top level), these populate + // resource_t.properties in Fluxion and are what the matcher's constraint + // pruning (by_constraint) and queries match against. Keys/values are + // strings; a bare tag is value "" (matching is by key presence). + NodeProperties map[string]string + // Rank is the flux execution rank for a node-typed vertex. The rv1 match + // writer builds its R_lite/nodelist from node vertices keyed by rank, and + // cannot emit a vertex whose rank is -1, so every node (physical or virtual) + // needs a real, distinct, non-negative rank. Nil leaves the default (-1), + // which is correct for non-node vertices (cluster, core, memory, ...) that + // never appear in the nodelist. + Rank *int64 } // AddRoot creates a top-level vertex (typically the cluster). @@ -98,6 +113,11 @@ func (b *Builder) add(parent *Vertex, typ, basename string, opts Options) *Verte unit: opts.Unit, exclusive: opts.Exclusive, props: opts.Properties, + nodeProps: opts.NodeProperties, + rank: -1, + } + if opts.Rank != nil { + v.rank = *opts.Rank } if parent == nil { v.path = "/" + name @@ -128,7 +148,7 @@ func (m metadata) MarshalJSON() ([]byte, error) { "name": m.v.name, "id": m.v.id, "uniq_id": m.v.uniqID, - "rank": -1, + "rank": m.v.rank, "exclusive": m.v.exclusive, "unit": m.v.unit, "size": m.v.size, @@ -140,6 +160,16 @@ func (m metadata) MarshalJSON() ([]byte, error) { out[k] = val } } + // RFC 31 resource properties: a nested object Fluxion loads into + // resource_t.properties (what constraint pruning and queries match on). + // Emitted only when present so vertices without properties are unchanged. + if len(m.v.nodeProps) > 0 { + props := make(map[string]string, len(m.v.nodeProps)) + for k, val := range m.v.nodeProps { + props[k] = val + } + out["properties"] = props + } // Stable key order for deterministic output. keys := make([]string, 0, len(out)) for k := range out { @@ -180,19 +210,15 @@ type graph struct { Edges []edge `json:"edges"` } -// Doc is a complete JGF document. -type Doc struct { +// Document is a complete JGF document. +type Document struct { Graph graph `json:"graph"` } -// Doc assembles the accumulated vertices and edges into a JGF document. -func (b *Builder) Doc() Doc { - d := Doc{} +// Document assembles the accumulated vertices and edges into a JGF document. +func (b *Builder) Document() Document { + d := Document{} for _, v := range b.vertices { - // Skip the control plane - if strings.Contains(v.name, "control-plane") { - continue - } d.Graph.Nodes = append(d.Graph.Nodes, node{ID: v.key, Metadata: metadata{v: v}}) } for _, e := range b.edges { @@ -207,5 +233,5 @@ func (b *Builder) Doc() Doc { // JSON renders the document as indented JGF. func (b *Builder) JSON() ([]byte, error) { - return json.MarshalIndent(b.Doc(), "", " ") + return json.MarshalIndent(b.Document(), "", " ") } diff --git a/pkg/jgf/jgf_test.go b/pkg/jgf/jgf_test.go new file mode 100644 index 0000000..89bf4b2 --- /dev/null +++ b/pkg/jgf/jgf_test.go @@ -0,0 +1,109 @@ +package jgf + +import ( + "encoding/json" + "testing" +) + +// decodeNodes parses the builder's JSON into a slice of vertex metadata maps, +// keyed for easy assertions. +func decodeNodes(t *testing.T, b *Builder) []map[string]any { + t.Helper() + raw, err := b.JSON() + if err != nil { + t.Fatalf("JSON: %v", err) + } + var doc struct { + Graph struct { + Nodes []struct { + Metadata map[string]any `json:"metadata"` + } `json:"nodes"` + } `json:"graph"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + t.Fatalf("unmarshal: %v\n%s", err, raw) + } + out := make([]map[string]any, 0, len(doc.Graph.Nodes)) + for _, n := range doc.Graph.Nodes { + out = append(out, n.Metadata) + } + return out +} + +func findByName(nodes []map[string]any, name string) map[string]any { + for _, m := range nodes { + if m["name"] == name { + return m + } + } + return nil +} + +// A vertex given NodeProperties emits a nested "properties" object with those +// keys/values — the RFC 31 form Fluxion matches constraints against. +func TestNodePropertiesEmittedAsNestedObject(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "node", "node", Options{ + Name: "qdevice0", + NodeProperties: map[string]string{ + "virtual": "true", + "fluxion.flux-framework.org/region": "us-east-1", + }, + }) + + meta := findByName(decodeNodes(t, b), "qdevice0") + if meta == nil { + t.Fatal("qdevice0 vertex not found") + } + props, ok := meta["properties"].(map[string]any) + if !ok { + t.Fatalf("properties is not a nested object: %#v", meta["properties"]) + } + if props["virtual"] != "true" { + t.Errorf("properties[virtual] = %v, want true", props["virtual"]) + } + if props["fluxion.flux-framework.org/region"] != "us-east-1" { + t.Errorf("properties[region] = %v, want us-east-1", props["fluxion.flux-framework.org/region"]) + } +} + +// A vertex without NodeProperties emits no "properties" key at all, so existing +// graphs are byte-for-byte unchanged. +func TestNoNodePropertiesMeansNoPropertiesKey(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "node", "node", Options{Name: "node0"}) + + meta := findByName(decodeNodes(t, b), "node0") + if meta == nil { + t.Fatal("node0 vertex not found") + } + if _, present := meta["properties"]; present { + t.Errorf("expected no properties key, got %#v", meta["properties"]) + } +} + +// NodeProperties is distinct from Properties: the latter still flattens into the +// top level, the former nests under "properties". +func TestPropertiesVsNodeProperties(t *testing.T) { + b := NewBuilder() + cluster := b.AddRoot("cluster", "cluster", Options{Name: "cluster"}) + b.AddChild(cluster, "qpu", "qpu", Options{ + Name: "sv1", + Properties: map[string]any{"vendor": "amazon"}, // flattened, descriptive + NodeProperties: map[string]string{"region": "us-east-1"}, // nested, matchable + }) + + meta := findByName(decodeNodes(t, b), "sv1") + if meta == nil { + t.Fatal("sv1 vertex not found") + } + if meta["vendor"] != "amazon" { + t.Errorf("expected flattened vendor=amazon at top level, got %v", meta["vendor"]) + } + props, ok := meta["properties"].(map[string]any) + if !ok || props["region"] != "us-east-1" { + t.Errorf("expected nested properties.region=us-east-1, got %#v", meta["properties"]) + } +} diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 397b0a9..9648cda 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -27,8 +27,41 @@ const ( // the pod itself for an ungrouped pod — so the allocation can be cancelled // when that object is deleted, and replayed on scheduler restart. JobIDAnnotation = "fluence.flux-framework.org/jobid" + + // AttributeAnnotationPrefix namespaces the matched backend's attributes when + // the scheduler stamps them onto the pod (e.g. + // fluence.flux-framework.org/attr-region=us-east-1). The webhook injects one + // downward-API env per such annotation so the workload reads exactly what it + // matched. Kept distinct from FluxionResourcePrefix (which marks resource + // *requests*) so request and result annotations never collide. + AttributeAnnotationPrefix = "fluence.flux-framework.org/attr-" + + // EnvVarPrefix is the normalized environment-variable namespace injected into + // the workload. The backend name becomes BACKEND; each attribute + // becomes (uppercased). A vendor-agnostic container reads + // these common names regardless of which backend it matched. + EnvVarPrefix = "FLUXION_" ) +// EnvVarName maps an attribute key to its normalized environment-variable name: +// uppercased, non-alphanumeric runes to underscores, under EnvVarPrefix. E.g. +// "region" -> "FLUXION_REGION", "price/min" -> "FLUXION_PRICE_MIN". +func EnvVarName(attrKey string) string { + var b strings.Builder + b.WriteString(EnvVarPrefix) + for _, r := range attrKey { + switch { + case r >= 'a' && r <= 'z': + b.WriteRune(r - 'a' + 'A') + case (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9'): + b.WriteRune(r) + default: + b.WriteByte('_') + } + } + return b.String() +} + // PodGroupName returns the native (Kubernetes 1.36) scheduling-group name a pod // belongs to, from spec.schedulingGroup.podGroupName, or "" if the pod is not // part of a group. This is the first-class field that links a Pod to its @@ -40,14 +73,37 @@ func PodGroupName(pod *corev1.Pod) string { return "" } +// ComputeProperty / VirtualProperty mirror the markers cluster stamps on graph +// node vertices (composed key=value form). A compute jobspec constrains to +// virtual=false nodes; a device jobspec constrains to virtual=true nodes. Kept +// here (not imported from cluster) to avoid an import cycle: cluster imports +// placement. +const ( + // VirtualPropertyTrue selects configured virtual resource nodes. + VirtualPropertyTrue = "virtual=true" + // VirtualPropertyFalse selects physical compute nodes. + VirtualPropertyFalse = "virtual=false" + // ClassPropertyPrefix composes a class= constraint selecting a virtual node by + // its configured resource type (e.g. class=qpu). Mirrors how the graph builder + // stamps the class property; kept here (not imported from cluster) to avoid an + // import cycle (cluster imports placement). + ClassPropertyPrefix = "class=" +) + +// ComposeClassProperty builds the class= constraint string for a resource type. +func ComposeClassProperty(resourceType string) string { + return ClassPropertyPrefix + resourceType +} + +// computeTypes are the Fluxion graph types that live under a physical +// (virtual=false) compute node. Everything requested via the +// fluxion.flux-framework.org/ prefix is a virtual device type instead. +var computeTypes = map[string]bool{"core": true, "memory": true, "gpu": true} + // podResources distills a pod's container requests into Fluxion resource counts -// keyed by Fluxion graph type (e.g. "core", "gpu", "qpu", "qubit"). -// -// Kubernetes names its native resources (cpu, memory, nvidia.com/gpu), so those -// get a small fixed mapping to graph types. Every resource named -// fluxion.flux-framework.org/ is passed through generically as , -// with no knowledge of what the type means — if the graph has it as a count, -// Fluxion will verify it. +// keyed by Fluxion graph type. Native Kubernetes resources (cpu, memory, +// nvidia.com/gpu) map to compute types; every fluxion.flux-framework.org/ +// request passes through generically as (a virtual device type). func podResources(p *corev1.Pod) map[string]int { counts := map[string]int{} for i := range p.Spec.Containers { @@ -66,74 +122,243 @@ func podResources(p *corev1.Pod) map[string]int { } } - // Every pod runs on a node, so always request at least one core, even when the - // pod also asks for an exotic resource (qpu/qubit). Without this a qpu-only pod - // produces a slot with no compute and Fluxion allocates a bare backend with no - // node to land the pod on. + // Every pod runs on a node, so always request at least one core. Without this + // a device-only pod produces a compute slot with no resources and Fluxion has + // no node to land the probing pod on. if counts["core"] == 0 { counts["core"] = 1 } return counts } -// JobspecForGroup builds a Fluxion jobspec for a whole pod group: a slot per pod -// (count = group size), each holding the per-pod resources as `with` entries — -// one per requested Fluxion type. A hybrid pod (e.g. cores + a qpu) produces a -// slot with both, so classical and quantum are requested together. The group is -// assumed homogeneous (same shape per pod); heterogeneous groups are a TODO. -func JobspecForGroup(groupName string, pods []corev1.Pod) (*jobspec.Jobspec, error) { - if len(pods) == 0 { - return nil, fmt.Errorf("pod group %q has no pods", groupName) +// splitResources separates a pod's resource counts into the compute resources +// (core/memory/gpu, satisfied by a virtual=false node) and the virtual device +// resources (everything requested via the fluxion prefix, satisfied by a +// virtual=true node). Returns counts keyed by graph type for each group. +func splitResources(counts map[string]int) (compute, devices map[string]int) { + compute = map[string]int{} + devices = map[string]int{} + for t, c := range counts { + if c <= 0 { + continue + } + if computeTypes[t] { + compute[t] = c + } else { + devices[t] = c + } } - counts := podResources(&pods[0]) + return compute, devices +} - // Deterministic order for stable jobspecs/tests. +// withEntries renders a count map into sorted jobspec `with` entries (stable +// output for tests and reproducible jobspecs). +func withEntries(counts map[string]int) []jobspec.Resource { types := make([]string, 0, len(counts)) for t := range counts { types = append(types, t) } sort.Strings(types) - var with []jobspec.Resource for _, t := range types { - if counts[t] > 0 { - with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) - } + with = append(with, jobspec.Resource{Type: t, Count: counts[t]}) } + return with +} + +// systemAttributes builds the attributes.system block: a hold-until-cancel +// allocation (duration 0 runs to graph end) plus an RFC 31 property constraint +// selecting the eligible node set. properties is the AND-set of composed +// key=value property strings a matched node must carry. +func systemAttributes(properties []string) map[string]interface{} { + return map[string]interface{}{ + "system": map[string]interface{}{ + // duration 0 => hold the allocation until we explicitly Cancel. + "duration": 0, + "constraints": map[string]interface{}{ + "properties": properties, + }, + }, + } +} +// computeJobspec builds the physical-compute jobspec for a group: one slot per +// pod holding the compute resources, constrained to virtual=false nodes. This is +// the only jobspec for a group that requests no virtual devices. +func computeJobspec(groupName string, slots int, compute map[string]int) *jobspec.Jobspec { return &jobspec.Jobspec{ Version: 9999, Resources: []jobspec.Resource{{ Type: "slot", - Count: len(pods), + Count: slots, Label: "default", - With: with, + With: withEntries(compute), }}, + Attributes: systemAttributes([]string{VirtualPropertyFalse}), Tasks: []jobspec.Task{{ Command: []string{groupName}, Slot: "default", Count: map[string]int{"per_slot": 1}, }}, - }, nil + } } -// Placement is the result of matching a pod group: the cluster nodes to bind to -// (one per slot) and, for quantum groups, the allocated backend. +// deviceJobspec builds a jobspec selecting a single virtual device type. Every +// configured virtual resource is a node carrying class= (and the classes +// of its descendants, so a nested type is reachable), so a device is selected by +// constraining to a virtual node of the requested class. count is the requested +// quantity. +// +// The constraint is virtual=true (scope to virtual backends, not physical nodes) +// AND class= (the requested resource type). The slot requests `node` +// because every virtual resource is a node; the class constraint — not the `with` +// type — picks which one. A nested type (e.g. qpu under a qdevice node) is +// reachable because every ancestor node also advertises class= for the +// types beneath it, so the global constraint does not prune the path. +func deviceJobspec(groupName, deviceType string, count int) *jobspec.Jobspec { + return &jobspec.Jobspec{ + Version: 9999, + Resources: []jobspec.Resource{{ + Type: "slot", + Count: 1, + Label: "device", + With: []jobspec.Resource{{Type: "node", Count: count}}, + }}, + Attributes: systemAttributes([]string{ + VirtualPropertyTrue, + ComposeClassProperty(deviceType), + }), + Tasks: []jobspec.Task{{ + Command: []string{groupName}, + Slot: "device", + Count: map[string]int{"per_slot": 1}, + }}, + } +} + +// JobspecsForGroup builds the set of Fluxion jobspecs to match for a pod group, +// each held independently (duration 0, released by Cancel) and combined all-or- +// nothing by the caller: +// +// - exactly one compute jobspec (slot per pod, virtual=false) — always present, +// so a plain pod or group with no virtual resources yields a single match; +// - one device jobspec per distinct requested virtual resource type +// (constraint virtual=true; the requested type+count rides the slot's `with`). +// +// knownDevices is the set of device types the graph actually models (the +// FluxionResourceNames the device plugin advertises, suffixes only). A request +// for a type not in the graph is a hard error, caught here rather than as an +// opaque match failure. A nil/empty knownDevices with no device requests is +// fine (classical-only). +func JobspecsForGroup( + groupName string, + pods []corev1.Pod, + knownDevices map[string]bool, +) ([]*jobspec.Jobspec, error) { + if len(pods) == 0 { + return nil, fmt.Errorf("pod group %q has no pods", groupName) + } + counts := podResources(&pods[0]) + compute, devices := splitResources(counts) + + specs := []*jobspec.Jobspec{computeJobspec(groupName, len(pods), compute)} + + // Deterministic device order for stable output. + deviceTypes := make([]string, 0, len(devices)) + for t := range devices { + deviceTypes = append(deviceTypes, t) + } + sort.Strings(deviceTypes) + + for _, t := range deviceTypes { + if len(knownDevices) > 0 && !knownDevices[t] { + return nil, fmt.Errorf( + "pod group %q requests virtual resource %q which is not modeled in the resources graph", + groupName, t) + } + if knownDevices == nil { + return nil, fmt.Errorf( + "pod group %q requests virtual resource %q but no resources graph is configured", + groupName, t) + } + specs = append(specs, deviceJobspec(groupName, t, devices[t])) + } + return specs, nil +} + +// Placement is the result of matching one of a group's jobspecs. Nodes are the +// physical (virtual=false) compute nodes a pod binds to; Backend is the virtual +// (virtual=true) resource's identity, surfaced to the pod as env. A single +// allocation yields one or the other: the compute match yields nodes, a device +// match yields a backend. type Placement struct { - Nodes []string // allocated cluster node names - Backend string // allocated qpu/backend name (quantum groups only) + Nodes []string // physical compute node names (virtual=false) + Backend string // virtual backend identity (virtual=true), if any + // BackendAttributes are the matched virtual resource's user attributes + // (region, qubits, ...), decomposed from the backend node's namespaced + // properties. These are injected into the pod as env so the workload sees + // exactly what it matched/queried — the same set that is filterable is also + // readable back. + BackendAttributes map[string]string +} + +// decomposeProperty reverses ComposeProperty: "key=value" -> (key, value, true); +// a bare "key" -> (key, "", true). Used to recover attributes from a backend +// node's composed property keys. +func decomposeProperty(prop string) (key, value string) { + if i := strings.IndexByte(prop, '='); i >= 0 { + return prop[:i], prop[i+1:] + } + return prop, "" } -// PlacementFromAllocation parses a JGF allocation into node and backend names. +// PlacementFromAllocation parses one Fluxion allocation and classifies its +// node-typed vertices by the virtual marker property: virtual=false nodes are +// physical compute (bind targets), virtual=true nodes are virtual backends +// (their name is the backend identity injected into the pod). Everything is +// type "node" now, so the marker — not the vertex type — does the split. +// +// A node carrying neither marker is treated as a physical compute node, so a +// plain graph built without markers still binds correctly. For the chosen +// backend, its user attributes (the fluxion.flux-framework.org/= +// properties) are decomposed into BackendAttributes for env injection. func PlacementFromAllocation(alloc string) (Placement, error) { - nodes, err := graph.NamesFromAllocation(alloc, "node") + nodes, err := graph.NodesFromAllocation(alloc) if err != nil { return Placement{}, err } - backends, _ := graph.NamesFromAllocation(alloc, "qpu") - p := Placement{Nodes: nodes} - if len(backends) > 0 { - p.Backend = backends[0] + var p Placement + for _, n := range nodes { + if n.HasProperty(VirtualPropertyTrue) { + // First virtual node is the backend identity (one backend per group). + if p.Backend != "" { + continue + } + p.Backend = n.Name + p.BackendAttributes = backendAttributes(n.Properties) + continue + } + // virtual=false, or unmarked: a physical compute node to bind. + p.Nodes = append(p.Nodes, n.Name) } return p, nil } + +// backendAttributes extracts the user attributes from a backend node's composed +// property keys: every property of the form +// "fluxion.flux-framework.org/=" becomes -> . Reserved +// markers (virtual=..., class=...) are skipped. +func backendAttributes(props map[string]string) map[string]string { + var attrs map[string]string + for prop := range props { + if !strings.HasPrefix(prop, FluxionResourcePrefix) { + continue + } + key, value := decomposeProperty(strings.TrimPrefix(prop, FluxionResourcePrefix)) + if attrs == nil { + attrs = map[string]string{} + } + attrs[key] = value + } + return attrs +} diff --git a/pkg/placement/placement_test.go b/pkg/placement/placement_test.go index d5e8571..36e3a65 100644 --- a/pkg/placement/placement_test.go +++ b/pkg/placement/placement_test.go @@ -18,7 +18,8 @@ func podWith(name string, req corev1.ResourceList) corev1.Pod { func qty(n int64) resource.Quantity { return *resource.NewQuantity(n, resource.DecimalSI) } -// withType returns the count for a given Fluxion type in the slot's `with`. +// withType returns the count for a given Fluxion type in a jobspec slot's +// `with`. func withType(js *jobspec.Jobspec, t string) (int, bool) { for _, w := range js.Resources[0].With { if w.Type == t { @@ -28,17 +29,51 @@ func withType(js *jobspec.Jobspec, t string) (int, bool) { return 0, false } -func TestClassical(t *testing.T) { +// constraintProps returns the constraint property list from a jobspec's +// attributes.system.constraints.properties. +func constraintProps(t *testing.T, js *jobspec.Jobspec) []string { + t.Helper() + sys, ok := js.Attributes["system"].(map[string]interface{}) + if !ok { + t.Fatalf("no attributes.system: %#v", js.Attributes) + } + cons, ok := sys["constraints"].(map[string]interface{}) + if !ok { + t.Fatalf("no constraints: %#v", sys) + } + props, ok := cons["properties"].([]string) + if !ok { + t.Fatalf("properties not []string: %#v", cons["properties"]) + } + return props +} + +func hasProp(props []string, want string) bool { + for _, p := range props { + if p == want { + return true + } + } + return false +} + +// A classical group (no virtual resources) yields exactly one jobspec: the +// compute slot, constrained to virtual=false. +func TestClassicalSingleMatch(t *testing.T) { pods := []corev1.Pod{ podWith("p0", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), podWith("p1", corev1.ResourceList{corev1.ResourceCPU: qty(4), "nvidia.com/gpu": qty(1)}), } - js, err := JobspecForGroup("grp", pods) + specs, err := JobspecsForGroup("grp", pods, nil) if err != nil { t.Fatal(err) } + if len(specs) != 1 { + t.Fatalf("classical group should yield 1 jobspec, got %d", len(specs)) + } + js := specs[0] if js.Resources[0].Count != 2 { - t.Fatalf("slot count = %d, want 2", js.Resources[0].Count) + t.Errorf("slot count = %d, want 2", js.Resources[0].Count) } if c, _ := withType(js, "core"); c != 4 { t.Errorf("core = %d, want 4", c) @@ -46,91 +81,152 @@ func TestClassical(t *testing.T) { if c, _ := withType(js, "gpu"); c != 1 { t.Errorf("gpu = %d, want 1", c) } - if _, ok := withType(js, "qpu"); ok { - t.Error("classical pod should not request qpu") + if !hasProp(constraintProps(t, js), VirtualPropertyFalse) { + t.Errorf("compute jobspec must constrain virtual=false; got %v", constraintProps(t, js)) } } -func TestGenericQuantumCount(t *testing.T) { - // fluxion.flux-framework.org/qpu: 1 -> a qpu count, with no per-type code. - p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) - js, err := JobspecForGroup("qgrp", []corev1.Pod{p}) +// A device request yields a second jobspec constrained to virtual=true (the +// requested type rides the slot's `with`, not a class= constraint), while the +// compute jobspec stays virtual=false and the device type does NOT leak into the +// compute slot. +func TestDeviceProducesSecondMatch(t *testing.T) { + p := podWith("q", corev1.ResourceList{ + corev1.ResourceCPU: qty(1), + FluxionResourcePrefix + "qpu": qty(1), + }) + known := map[string]bool{"qpu": true} + specs, err := JobspecsForGroup("qgrp", []corev1.Pod{p}, known) if err != nil { t.Fatal(err) } - if c, ok := withType(js, "qpu"); !ok || c != 1 { - t.Fatalf("qpu = %d (ok=%v), want 1", c, ok) + if len(specs) != 2 { + t.Fatalf("expected 2 jobspecs (compute + device), got %d", len(specs)) + } + + compute := specs[0] + if !hasProp(constraintProps(t, compute), VirtualPropertyFalse) { + t.Errorf("compute constraint = %v, want virtual=false", constraintProps(t, compute)) + } + if _, ok := withType(compute, "qpu"); ok { + t.Error("qpu must not appear in the compute jobspec") + } + if c, _ := withType(compute, "core"); c != 1 { + t.Errorf("compute core = %d, want 1", c) + } + + device := specs[1] + props := constraintProps(t, device) + if !hasProp(props, VirtualPropertyTrue) { + t.Errorf("device constraint must include virtual=true; got %v", props) + } + // The device is selected by class=: every virtual resource is + // a node carrying its class, so the constraint picks the right one. The slot + // requests a node (every virtual resource is a node), count from the request. + if !hasProp(props, "class=qpu") { + t.Errorf("device constraint must include class=qpu; got %v", props) } - // no classical core forced on an exotic-only request - if _, ok := withType(js, "core"); !ok { - t.Error("quantum-only pod is currently required to request a core") + if len(props) != 2 { + t.Errorf("device constraint should be [virtual=true class=qpu]; got %v", props) + } + if c, ok := withType(device, "node"); !ok || c != 1 { + t.Errorf("device should request node count 1; got %d (ok=%v)", c, ok) } } -func TestGenericQubitCount(t *testing.T) { - // "at least 156 qubits" expressed as a count (Fluxion count match is >=). - p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qubit": qty(156)}) - js, err := JobspecForGroup("qubits", []corev1.Pod{p}) +// A device-only request still forces a compute jobspec (the probing pod needs a +// node), so there are two matches: compute (core=1, virtual=false) and device. +func TestDeviceOnlyStillForcesCompute(t *testing.T) { + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "qpu": qty(1)}) + specs, err := JobspecsForGroup("qonly", []corev1.Pod{p}, map[string]bool{"qpu": true}) if err != nil { t.Fatal(err) } - if c, ok := withType(js, "qubit"); !ok || c != 156 { - t.Fatalf("qubit = %d (ok=%v), want 156", c, ok) + if len(specs) != 2 { + t.Fatalf("expected 2 jobspecs, got %d", len(specs)) + } + if c, _ := withType(specs[0], "core"); c != 1 { + t.Errorf("forced compute core = %d, want 1", c) } } -func TestHybrid(t *testing.T) { - // cores AND a qpu in the same pod -> both appear in the slot. - p := podWith("h", corev1.ResourceList{ - corev1.ResourceCPU: qty(2), +// Requesting a device type the graph does not model is a hard error. +func TestUnknownDeviceErrors(t *testing.T) { + p := podWith("q", corev1.ResourceList{FluxionResourcePrefix + "fpga": qty(1)}) + _, err := JobspecsForGroup("grp", []corev1.Pod{p}, map[string]bool{"qpu": true}) + if err == nil { + t.Fatal("expected an error for an unmodeled device type") + } +} + +// duration is 0 (hold until cancel) on every generated jobspec. +func TestHoldDurationZero(t *testing.T) { + p := podWith("q", corev1.ResourceList{ + corev1.ResourceCPU: qty(1), FluxionResourcePrefix + "qpu": qty(1), }) - js, err := JobspecForGroup("hyb", []corev1.Pod{p}) + specs, err := JobspecsForGroup("g", []corev1.Pod{p}, map[string]bool{"qpu": true}) if err != nil { t.Fatal(err) } - if c, _ := withType(js, "core"); c != 2 { - t.Errorf("core = %d, want 2", c) - } - if c, _ := withType(js, "qpu"); c != 1 { - t.Errorf("qpu = %d, want 1", c) + for i, js := range specs { + sys := js.Attributes["system"].(map[string]interface{}) + if sys["duration"] != 0 { + t.Errorf("jobspec %d duration = %v, want 0", i, sys["duration"]) + } } } -func TestPlacementFromAllocation(t *testing.T) { +// A compute allocation: node vertices carrying virtual=false are bind targets. +func TestPlacementComputeNodes(t *testing.T) { alloc := `{"graph":{"nodes":[ - {"metadata":{"type":"node","name":"node-a"}}, + {"metadata":{"type":"node","name":"node-a","properties":{"virtual=false":""}}}, {"metadata":{"type":"core","name":"core0"}}, - {"metadata":{"type":"node","name":"node-b"}}, - {"metadata":{"type":"qpu","name":"ibm_fez"}}]}}` + {"metadata":{"type":"node","name":"node-b","properties":{"virtual=false":""}}}]}}` p, err := PlacementFromAllocation(alloc) if err != nil { t.Fatal(err) } if len(p.Nodes) != 2 || p.Nodes[0] != "node-a" || p.Nodes[1] != "node-b" { - t.Fatalf("nodes = %v", p.Nodes) + t.Fatalf("nodes = %v, want [node-a node-b]", p.Nodes) } - if p.Backend != "ibm_fez" { - t.Fatalf("backend = %q", p.Backend) + if p.Backend != "" { + t.Fatalf("compute allocation should have no backend, got %q", p.Backend) } } -func TestPlacementQuantumOnly(t *testing.T) { - // A pure-quantum allocation has a qpu (under qgateway) but NO node vertex. - // Nodes must be empty and Backend set — fluence then imposes no node constraint. +// A device allocation: the virtual=true node is the backend identity, not a bind +// target. Its qpu/qubit children do not become the backend name. +func TestPlacementVirtualBackend(t *testing.T) { alloc := `{"graph":{"nodes":[ - {"metadata":{"type":"cluster","name":"kind"}}, - {"metadata":{"type":"qgateway","name":"qgateway0"}}, - {"metadata":{"type":"qpu","name":"ibm_marrakesh"}}, + {"metadata":{"type":"node","name":"rigetti_cepheus","properties":{"virtual=true":"","class=qdevice":""}}}, + {"metadata":{"type":"qpu","name":"qpu0"}}, {"metadata":{"type":"qubit","name":"qubit0"}}]}}` p, err := PlacementFromAllocation(alloc) if err != nil { t.Fatal(err) } if len(p.Nodes) != 0 { - t.Fatalf("quantum-only allocation should have no nodes, got %v", p.Nodes) + t.Fatalf("device allocation should bind no compute node, got %v", p.Nodes) + } + if p.Backend != "rigetti_cepheus" { + t.Fatalf("backend = %q, want rigetti_cepheus (the virtual=true node)", p.Backend) + } +} + +// An unmarked node (graph built without markers) is treated as a bind target, so +// a plain classical graph still places correctly. +func TestPlacementUnmarkedNodeIsCompute(t *testing.T) { + alloc := `{"graph":{"nodes":[ + {"metadata":{"type":"node","name":"plain-node"}}]}}` + p, err := PlacementFromAllocation(alloc) + if err != nil { + t.Fatal(err) + } + if len(p.Nodes) != 1 || p.Nodes[0] != "plain-node" { + t.Fatalf("nodes = %v, want [plain-node]", p.Nodes) } - if p.Backend != "ibm_marrakesh" { - t.Fatalf("backend = %q, want ibm_marrakesh", p.Backend) + if p.Backend != "" { + t.Fatalf("unmarked node should not be a backend, got %q", p.Backend) } } diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 08e8364..17a7b48 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -25,6 +25,7 @@ import ( "encoding/pem" "fmt" "io" + "log" "math/big" "net/http" "strings" @@ -49,42 +50,88 @@ type jsonPatchOp struct { Value any `json:"value,omitempty"` } -// backendEnv is the downward-API env injected into quantum containers. To the -// app it is an ordinary env var; its value comes from the fluence backend -// annotation, which the scheduler sets in PreBind. -func backendEnv() corev1.EnvVar { +// Mutator injects fluence's scheduler-chosen values into a pod's containers. It +// carries the env contract — the union of attribute keys across the configured +// backends — so it injects a stable, predictable set of environment variables +// regardless of which backend a given pod ends up matching. Values flow via the +// downward API from annotations the scheduler writes in PreBind, so the env var +// NAMES are fixed at pod-creation time (here) while their VALUES populate later. +type Mutator struct { + // AttributeKeys is the union of user attribute keys across all backends. Each + // becomes a FLUXION_ env var sourced from its attr- annotation. + AttributeKeys []string +} + +// injectedEnv returns the full normalized env set this mutator injects into a +// fluxion-requesting container: FLUXION_BACKEND plus one FLUXION_ per +// configured attribute key. Each reads its annotation via the downward API; an +// annotation the scheduler did not set resolves to empty, which is harmless. +func (m *Mutator) injectedEnv() []corev1.EnvVar { + envs := []corev1.EnvVar{annotationEnv( + placement.EnvVarPrefix+"BACKEND", placement.BackendAnnotation)} + for _, key := range m.AttributeKeys { + envs = append(envs, annotationEnv( + placement.EnvVarName(key), placement.AttributeAnnotationPrefix+key)) + } + return envs +} + +// EnvVarNames returns the names of every env var this mutator injects, for +// startup logging so the developer sees the exact contract their container can +// rely on. +func (m *Mutator) EnvVarNames() []string { + names := make([]string, 0, len(m.AttributeKeys)+1) + for _, e := range m.injectedEnv() { + names = append(names, e.Name) + } + return names +} + +// annotationEnv builds a downward-API env var that reads a pod annotation. +func annotationEnv(envName, annotationKey string) corev1.EnvVar { return corev1.EnvVar{ - Name: "QRMI_BACKEND", + Name: envName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: fmt.Sprintf("metadata.annotations['%s']", placement.BackendAnnotation), + FieldPath: fmt.Sprintf("metadata.annotations['%s']", annotationKey), }, }, } } // Mutate returns the JSON Patch operations for a pod, or nil if nothing applies. -func Mutate(pod *corev1.Pod) []jsonPatchOp { +// For each container that requests a fluxion.flux-framework.org/* resource, it +// appends every contract env var the container does not already define. +func (m *Mutator) Mutate(pod *corev1.Pod) []jsonPatchOp { if pod.Spec.SchedulerName != SchedulerName { return nil } + contract := m.injectedEnv() var ops []jsonPatchOp for i, c := range pod.Spec.Containers { - if !requestsFluxionResource(c) || hasEnv(c, "QRMI_BACKEND") { + if !requestsFluxionResource(c) { continue } - if len(c.Env) == 0 { - ops = append(ops, jsonPatchOp{ - Op: "add", - Path: fmt.Sprintf("/spec/containers/%d/env", i), - Value: []corev1.EnvVar{backendEnv()}, - }) - } else { + for _, e := range contract { + if hasEnv(c, e.Name) { + continue + } + if len(c.Env) == 0 { + ops = append(ops, jsonPatchOp{ + Op: "add", + Path: fmt.Sprintf("/spec/containers/%d/env", i), + Value: []corev1.EnvVar{e}, + }) + // Subsequent vars append to the now-existing slice. + c.Env = []corev1.EnvVar{e} + continue + } ops = append(ops, jsonPatchOp{ Op: "add", Path: fmt.Sprintf("/spec/containers/%d/env/-", i), - Value: backendEnv(), + Value: e, }) + c.Env = append(c.Env, e) } } return ops @@ -110,7 +157,7 @@ func hasEnv(c corev1.Container, name string) bool { // Handler is the /mutate endpoint. It always admits the pod (failure to mutate // must not block creation); it only adds a patch when Mutate returns one. -func Handler(w http.ResponseWriter, r *http.Request) { +func (m *Mutator) Handler(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -125,11 +172,13 @@ func Handler(w http.ResponseWriter, r *http.Request) { resp := &admissionv1.AdmissionResponse{UID: review.Request.UID, Allowed: true} var pod corev1.Pod if err := json.Unmarshal(review.Request.Object.Raw, &pod); err == nil { - if ops := Mutate(&pod); len(ops) > 0 { + if ops := m.Mutate(&pod); len(ops) > 0 { if patch, err := json.Marshal(ops); err == nil { pt := admissionv1.PatchTypeJSONPatch resp.Patch = patch resp.PatchType = &pt + log.Printf("[fluence-webhook] injected %d env op(s) into pod %s/%s", + len(ops), pod.Namespace, pod.Name) } } } diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index c5a164c..6d97e40 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) -func qpuPod(scheduler string, withEnv bool) *corev1.Pod { +func qpuPod(scheduler string, presetEnv string) *corev1.Pod { c := corev1.Container{ Name: "app", Resources: corev1.ResourceRequirements{ @@ -17,40 +17,101 @@ func qpuPod(scheduler string, withEnv bool) *corev1.Pod { }, }, } - if withEnv { - c.Env = []corev1.EnvVar{{Name: "QRMI_BACKEND", Value: "preset"}} + if presetEnv != "" { + c.Env = []corev1.EnvVar{{Name: presetEnv, Value: "preset"}} } return &corev1.Pod{Spec: corev1.PodSpec{SchedulerName: scheduler, Containers: []corev1.Container{c}}} } -func TestMutateInjectsBackendEnv(t *testing.T) { - ops := Mutate(qpuPod("fluence", false)) - if len(ops) != 1 { - t.Fatalf("want 1 op, got %d", len(ops)) +// envNames returns the env var names referenced by a list of add-ops. +func opEnvNames(ops []jsonPatchOp) []string { + var names []string + for _, op := range ops { + switch v := op.Value.(type) { + case corev1.EnvVar: + names = append(names, v.Name) + case []corev1.EnvVar: + for _, e := range v { + names = append(names, e.Name) + } + } } - if ops[0].Path != "/spec/containers/0/env" { - t.Errorf("path = %q", ops[0].Path) + return names +} + +func contains(names []string, want string) bool { + for _, n := range names { + if n == want { + return true + } + } + return false +} + +// With a config-derived contract (region, qubits), a fluxion pod gets +// FLUXION_BACKEND plus one FLUXION_ per attribute key. +func TestMutateInjectsContract(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region", "qubits"}} + ops := m.Mutate(qpuPod("fluence", "")) + names := opEnvNames(ops) + + for _, want := range []string{"FLUXION_BACKEND", "FLUXION_REGION", "FLUXION_QUBITS"} { + if !contains(names, want) { + t.Errorf("missing injected env %q; got %v", want, names) + } + } + if len(names) != 3 { + t.Errorf("expected exactly 3 env vars, got %v", names) } } +// With no configured attributes, only FLUXION_BACKEND is injected. +func TestMutateBackendOnly(t *testing.T) { + m := &Mutator{} + names := opEnvNames(m.Mutate(qpuPod("fluence", ""))) + if len(names) != 1 || names[0] != "FLUXION_BACKEND" { + t.Fatalf("want [FLUXION_BACKEND], got %v", names) + } +} + +// Non-fluence pods are never mutated. func TestMutateSkipsOtherScheduler(t *testing.T) { - if ops := Mutate(qpuPod("default-scheduler", false)); ops != nil { + m := &Mutator{AttributeKeys: []string{"region"}} + if ops := m.Mutate(qpuPod("default-scheduler", "")); ops != nil { t.Fatalf("non-fluence pod should not be mutated, got %v", ops) } } +// An env var the container already defines is not re-injected (idempotent / no +// override), while the others still are. func TestMutateRespectsExistingEnv(t *testing.T) { - if ops := Mutate(qpuPod("fluence", true)); ops != nil { - t.Fatalf("should not override an existing QRMI_BACKEND, got %v", ops) + m := &Mutator{AttributeKeys: []string{"region"}} + names := opEnvNames(m.Mutate(qpuPod("fluence", "FLUXION_BACKEND"))) + if contains(names, "FLUXION_BACKEND") { + t.Errorf("should not re-inject existing FLUXION_BACKEND; got %v", names) + } + if !contains(names, "FLUXION_REGION") { + t.Errorf("should still inject FLUXION_REGION; got %v", names) } } -func TestMutateSkipsNonQuantum(t *testing.T) { +// Classical pods (no fluxion resource request) are not mutated. +func TestMutateSkipsNonFluxion(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region"}} p := &corev1.Pod{Spec: corev1.PodSpec{ SchedulerName: "fluence", Containers: []corev1.Container{{Name: "c", Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI)}}}}, }} - if ops := Mutate(p); ops != nil { + if ops := m.Mutate(p); ops != nil { t.Fatalf("classical pod should not be mutated, got %v", ops) } } + +// EnvVarNames reports the full contract for startup logging. +func TestEnvVarNames(t *testing.T) { + m := &Mutator{AttributeKeys: []string{"region", "connectivity"}} + names := m.EnvVarNames() + if len(names) != 3 || names[0] != "FLUXION_BACKEND" { + t.Fatalf("EnvVarNames = %v, want FLUXION_BACKEND first then attrs", names) + } +}